reth_downloaders/
file_client.rs

1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use futures::Future;
5use itertools::Either;
6use reth_consensus::{Consensus, ConsensusError};
7use reth_network_p2p::{
8    bodies::client::{BodiesClient, BodiesFut},
9    download::DownloadClient,
10    error::RequestError,
11    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
12    priority::Priority,
13    BlockClient,
14};
15use reth_network_peers::PeerId;
16use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
17use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
18use thiserror::Error;
19use tokio::{fs::File, io::AsyncReadExt};
20use tokio_stream::StreamExt;
21use tokio_util::codec::FramedRead;
22use tracing::{debug, trace, warn};
23
24use super::file_codec::BlockFileCodec;
25use crate::receipt_file_client::FromReceiptReader;
26
27/// Default byte length of chunk to read from chain file.
28///
29/// Default is 1 GB.
30pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
31
32/// Front-end API for fetching chain data from a file.
33///
34/// Blocks are assumed to be written one after another in a file, as rlp bytes.
35///
36/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
37/// rlp(block1) || rlp(block2) || rlp(block3)
38///
39/// Blocks are assumed to have populated transactions, so reading headers will also buffer
40/// transactions in memory for use in the bodies stage.
41///
42/// This reads the entire file into memory, so it is not suitable for large files.
43#[derive(Debug, Clone)]
44pub struct FileClient<B: Block> {
45    /// The buffered headers retrieved when fetching new bodies.
46    headers: HashMap<BlockNumber, B::Header>,
47
48    /// A mapping between block hash and number.
49    hash_to_number: HashMap<BlockHash, BlockNumber>,
50
51    /// The buffered bodies retrieved when fetching new headers.
52    bodies: HashMap<BlockHash, B::Body>,
53}
54
55/// An error that can occur when constructing and using a [`FileClient`].
56#[derive(Debug, Error)]
57pub enum FileClientError {
58    /// An error occurred when validating a header from file.
59    #[error(transparent)]
60    Consensus(#[from] ConsensusError),
61
62    /// An error occurred when opening or reading the file.
63    #[error(transparent)]
64    Io(#[from] std::io::Error),
65
66    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
67    #[error("{0}")]
68    Rlp(alloy_rlp::Error, Vec<u8>),
69
70    /// Custom error message.
71    #[error("{0}")]
72    Custom(&'static str),
73}
74
75impl From<&'static str> for FileClientError {
76    fn from(value: &'static str) -> Self {
77        Self::Custom(value)
78    }
79}
80
81impl<B: FullBlock> FileClient<B> {
82    /// Create a new file client from a file path.
83    pub async fn new<P: AsRef<Path>>(
84        path: P,
85        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
86    ) -> Result<Self, FileClientError> {
87        let file = File::open(path).await?;
88        Self::from_file(file, consensus).await
89    }
90
91    /// Initialize the [`FileClient`] with a file directly.
92    pub(crate) async fn from_file(
93        mut file: File,
94        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
95    ) -> Result<Self, FileClientError> {
96        // get file len from metadata before reading
97        let metadata = file.metadata().await?;
98        let file_len = metadata.len();
99
100        let mut reader = vec![];
101        file.read_to_end(&mut reader).await?;
102
103        Ok(FileClientBuilder { consensus, parent_header: None }
104            .build(&reader[..], file_len)
105            .await?
106            .file_client)
107    }
108
109    /// Get the tip hash of the chain.
110    pub fn tip(&self) -> Option<B256> {
111        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
112    }
113
114    /// Get the start hash of the chain.
115    pub fn start(&self) -> Option<B256> {
116        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
117    }
118
119    /// Returns the highest block number of this client has or `None` if empty
120    pub fn max_block(&self) -> Option<u64> {
121        self.headers.keys().max().copied()
122    }
123
124    /// Returns the lowest block number of this client has or `None` if empty
125    pub fn min_block(&self) -> Option<u64> {
126        self.headers.keys().min().copied()
127    }
128
129    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
130    /// before returning.
131    pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
132        self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
133    }
134
135    /// Returns true if all blocks are canonical (no gaps)
136    pub fn has_canonical_blocks(&self) -> bool {
137        if self.headers.is_empty() {
138            return true
139        }
140        let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
141        nums.sort_unstable();
142        let mut iter = nums.into_iter();
143        let mut lowest = iter.next().expect("not empty");
144        for next in iter {
145            if next != lowest + 1 {
146                return false
147            }
148            lowest = next;
149        }
150        true
151    }
152
153    /// Use the provided bodies as the file client's block body buffer.
154    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
155        self.bodies = bodies;
156        self
157    }
158
159    /// Use the provided headers as the file client's block body buffer.
160    pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
161        self.headers = headers;
162        for (number, header) in &self.headers {
163            self.hash_to_number.insert(header.hash_slow(), *number);
164        }
165        self
166    }
167
168    /// Returns the current number of headers in the client.
169    pub fn headers_len(&self) -> usize {
170        self.headers.len()
171    }
172
173    /// Returns the current number of bodies in the client.
174    pub fn bodies_len(&self) -> usize {
175        self.bodies.len()
176    }
177
178    /// Returns an iterator over headers in the client.
179    pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
180        self.headers.values()
181    }
182
183    /// Returns a mutable iterator over bodies in the client.
184    ///
185    /// Panics, if file client headers and bodies are not mapping 1-1.
186    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
187        let bodies = &mut self.bodies;
188        let numbers = &self.hash_to_number;
189        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
190    }
191
192    /// Returns the current number of transactions in the client.
193    pub fn total_transactions(&self) -> usize {
194        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
195    }
196}
197
198struct FileClientBuilder<B: Block> {
199    pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
200    pub parent_header: Option<SealedHeader<B::Header>>,
201}
202
203impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
204    for FileClientBuilder<B>
205{
206    type Error = FileClientError;
207    type Output = FileClient<B>;
208
209    /// Initialize the [`FileClient`] from bytes that have been read from file.
210    fn build<R>(
211        &self,
212        reader: R,
213        num_bytes: u64,
214    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
215    where
216        R: AsyncReadExt + Unpin,
217    {
218        let mut headers = HashMap::default();
219        let mut hash_to_number = HashMap::default();
220        let mut bodies = HashMap::default();
221
222        // use with_capacity to make sure the internal buffer contains the entire chunk
223        let mut stream =
224            FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
225
226        trace!(target: "downloaders::file",
227            target_num_bytes=num_bytes,
228            capacity=stream.read_buffer().capacity(),
229            "init decode stream"
230        );
231
232        let mut remaining_bytes = vec![];
233
234        let mut log_interval = 0;
235        let mut log_interval_start_block = 0;
236
237        let mut parent_header = self.parent_header.clone();
238
239        async move {
240            while let Some(block_res) = stream.next().await {
241                let block = match block_res {
242                    Ok(block) => block,
243                    Err(FileClientError::Rlp(err, bytes)) => {
244                        trace!(target: "downloaders::file",
245                            %err,
246                            bytes_len=bytes.len(),
247                            "partial block returned from decoding chunk"
248                        );
249                        remaining_bytes = bytes;
250                        break
251                    }
252                    Err(err) => return Err(err),
253                };
254
255                let block = SealedBlock::seal_slow(block);
256
257                // Validate standalone header
258                self.consensus.validate_header(block.sealed_header())?;
259                if let Some(parent) = &parent_header {
260                    self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
261                    parent_header = Some(block.sealed_header().clone());
262                }
263
264                // Validate block against header
265                self.consensus.validate_block_pre_execution(&block)?;
266
267                // add to the internal maps
268                let block_hash = block.hash();
269                let block_number = block.number();
270                let (header, body) = block.split_sealed_header_body();
271                headers.insert(block_number, header.unseal());
272                hash_to_number.insert(block_hash, block_number);
273                bodies.insert(block_hash, body);
274
275                if log_interval == 0 {
276                    trace!(target: "downloaders::file",
277                        block_number,
278                        "read first block"
279                    );
280                    log_interval_start_block = block_number;
281                } else if log_interval % 100_000 == 0 {
282                    trace!(target: "downloaders::file",
283                        blocks=?log_interval_start_block..=block_number,
284                        "read blocks from file"
285                    );
286                    log_interval_start_block = block_number + 1;
287                }
288                log_interval += 1;
289            }
290
291            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
292
293            Ok(DecodedFileChunk {
294                file_client: FileClient { headers, hash_to_number, bodies },
295                remaining_bytes,
296                highest_block: None,
297            })
298        }
299    }
300}
301
302impl<B: FullBlock> HeadersClient for FileClient<B> {
303    type Header = B::Header;
304    type Output = HeadersFut<B::Header>;
305
306    fn get_headers_with_priority(
307        &self,
308        request: HeadersRequest,
309        _priority: Priority,
310    ) -> Self::Output {
311        // this just searches the buffer, and fails if it can't find the header
312        let mut headers = Vec::new();
313        trace!(target: "downloaders::file", request=?request, "Getting headers");
314
315        let start_num = match request.start {
316            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
317                Some(num) => *num,
318                None => {
319                    warn!(%hash, "Could not find starting block number for requested header hash");
320                    return Box::pin(async move { Err(RequestError::BadResponse) })
321                }
322            },
323            BlockHashOrNumber::Number(num) => num,
324        };
325
326        let range = if request.limit == 1 {
327            Either::Left(start_num..start_num + 1)
328        } else {
329            match request.direction {
330                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
331                HeadersDirection::Falling => {
332                    Either::Right((start_num - request.limit + 1..=start_num).rev())
333                }
334            }
335        };
336
337        trace!(target: "downloaders::file", range=?range, "Getting headers with range");
338
339        for block_number in range {
340            match self.headers.get(&block_number).cloned() {
341                Some(header) => headers.push(header),
342                None => {
343                    warn!(number=%block_number, "Could not find header");
344                    return Box::pin(async move { Err(RequestError::BadResponse) })
345                }
346            }
347        }
348
349        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
350    }
351}
352
353impl<B: FullBlock> BodiesClient for FileClient<B> {
354    type Body = B::Body;
355    type Output = BodiesFut<B::Body>;
356
357    fn get_block_bodies_with_priority_and_range_hint(
358        &self,
359        hashes: Vec<B256>,
360        _priority: Priority,
361        _range_hint: Option<RangeInclusive<u64>>,
362    ) -> Self::Output {
363        // this just searches the buffer, and fails if it can't find the block
364        let mut bodies = Vec::new();
365
366        // check if any are an error
367        // could unwrap here
368        for hash in hashes {
369            match self.bodies.get(&hash).cloned() {
370                Some(body) => bodies.push(body),
371                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
372            }
373        }
374
375        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
376    }
377}
378
379impl<B: FullBlock> DownloadClient for FileClient<B> {
380    fn report_bad_message(&self, _peer_id: PeerId) {
381        trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
382        // noop
383    }
384
385    fn num_connected_peers(&self) -> usize {
386        // no such thing as connected peers when we are just using a file
387        1
388    }
389}
390
391impl<B: FullBlock> BlockClient for FileClient<B> {
392    type Block = B;
393}
394
395/// Chunks file into several [`FileClient`]s.
396#[derive(Debug)]
397pub struct ChunkedFileReader {
398    /// File to read from.
399    file: File,
400    /// Current file byte length.
401    file_byte_len: u64,
402    /// Bytes that have been read.
403    chunk: Vec<u8>,
404    /// Max bytes per chunk.
405    chunk_byte_len: u64,
406    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
407    /// with block number
408    highest_block: Option<u64>,
409}
410
411impl ChunkedFileReader {
412    /// Returns the remaining file length.
413    pub const fn file_len(&self) -> u64 {
414        self.file_byte_len
415    }
416
417    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
418    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
419    pub async fn new<P: AsRef<Path>>(
420        path: P,
421        chunk_byte_len: Option<u64>,
422    ) -> Result<Self, FileClientError> {
423        let file = File::open(path).await?;
424        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
425
426        Self::from_file(file, chunk_byte_len).await
427    }
428
429    /// Opens the file to import from given path. Returns a new instance.
430    pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
431        // get file len from metadata before reading
432        let metadata = file.metadata().await?;
433        let file_byte_len = metadata.len();
434
435        Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
436    }
437
438    /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
439    /// length and the remaining file length.
440    const fn chunk_len(&self) -> u64 {
441        let Self { chunk_byte_len, file_byte_len, .. } = *self;
442        let file_byte_len = file_byte_len + self.chunk.len() as u64;
443
444        if chunk_byte_len > file_byte_len {
445            // last chunk
446            file_byte_len
447        } else {
448            chunk_byte_len
449        }
450    }
451
452    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
453    /// chunk to read.
454    async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
455        if self.file_byte_len == 0 && self.chunk.is_empty() {
456            // eof
457            return Ok(None)
458        }
459
460        let chunk_target_len = self.chunk_len();
461        let old_bytes_len = self.chunk.len() as u64;
462
463        // calculate reserved space in chunk
464        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
465
466        // read new bytes from file
467        let prev_read_bytes_len = self.chunk.len();
468        self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
469        let reader = &mut self.chunk[prev_read_bytes_len..];
470
471        // actual bytes that have been read
472        let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
473        let next_chunk_byte_len = self.chunk.len();
474
475        // update remaining file length
476        self.file_byte_len -= new_read_bytes_len;
477
478        debug!(target: "downloaders::file",
479            max_chunk_byte_len=self.chunk_byte_len,
480            prev_read_bytes_len,
481            new_read_bytes_target_len,
482            new_read_bytes_len,
483            next_chunk_byte_len,
484            remaining_file_byte_len=self.file_byte_len,
485            "new bytes were read from file"
486        );
487
488        Ok(Some(next_chunk_byte_len as u64))
489    }
490
491    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
492    pub async fn next_chunk<B: FullBlock>(
493        &mut self,
494        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
495        parent_header: Option<SealedHeader<B::Header>>,
496    ) -> Result<Option<FileClient<B>>, FileClientError> {
497        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
498
499        // make new file client from chunk
500        let DecodedFileChunk { file_client, remaining_bytes, .. } =
501            FileClientBuilder { consensus, parent_header }
502                .build(&self.chunk[..], next_chunk_byte_len)
503                .await?;
504
505        // save left over bytes
506        self.chunk = remaining_bytes;
507
508        Ok(Some(file_client))
509    }
510
511    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
512    pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
513    where
514        T: FromReceiptReader,
515    {
516        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
517
518        // make new file client from chunk
519        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
520            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
521                .await?;
522
523        // save left over bytes
524        self.chunk = remaining_bytes;
525        // update highest block
526        self.highest_block = highest_block;
527
528        Ok(Some(file_client))
529    }
530}
531
532/// Constructs a file client from a reader.
533pub trait FromReader {
534    /// Error returned by file client type.
535    type Error: From<io::Error>;
536
537    /// Output returned by file client type.
538    type Output;
539
540    /// Returns a file client
541    fn build<R>(
542        &self,
543        reader: R,
544        num_bytes: u64,
545    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
546    where
547        Self: Sized,
548        R: AsyncReadExt + Unpin;
549}
550
551/// Output from decoding a file chunk with [`FromReader::build`].
552#[derive(Debug)]
553pub struct DecodedFileChunk<T> {
554    /// File client, i.e. the decoded part of chunk.
555    pub file_client: T,
556    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
557    pub remaining_bytes: Vec<u8>,
558    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
559    /// block number, like receipts.
560    pub highest_block: Option<u64>,
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use crate::{
567        bodies::{
568            bodies::BodiesDownloaderBuilder,
569            test_utils::{insert_headers, zip_blocks},
570        },
571        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
572        test_utils::{generate_bodies, generate_bodies_file},
573    };
574    use assert_matches::assert_matches;
575    use futures_util::stream::StreamExt;
576    use rand::Rng;
577    use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
578    use reth_ethereum_primitives::Block;
579    use reth_network_p2p::{
580        bodies::downloader::BodyDownloader,
581        headers::downloader::{HeaderDownloader, SyncTarget},
582    };
583    use reth_provider::test_utils::create_test_provider_factory;
584    use std::sync::Arc;
585
586    #[tokio::test]
587    async fn streams_bodies_from_buffer() {
588        // Generate some random blocks
589        let factory = create_test_provider_factory();
590        let (headers, mut bodies) = generate_bodies(0..=19);
591
592        insert_headers(factory.db_ref().db(), &headers);
593
594        // create an empty file
595        let file = tempfile::tempfile().unwrap();
596
597        let client: Arc<FileClient<Block>> = Arc::new(
598            FileClient::from_file(file.into(), NoopConsensus::arc())
599                .await
600                .unwrap()
601                .with_bodies(bodies.clone()),
602        );
603        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
604            client.clone(),
605            Arc::new(TestConsensus::default()),
606            factory,
607        );
608        downloader.set_download_range(0..=19).expect("failed to set download range");
609
610        assert_matches!(
611            downloader.next().await,
612            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
613        );
614    }
615
616    #[tokio::test]
617    async fn download_headers_at_fork_head() {
618        reth_tracing::init_test_tracing();
619
620        let p3 = SealedHeader::default();
621        let p2 = child_header(&p3);
622        let p1 = child_header(&p2);
623        let p0 = child_header(&p1);
624
625        let file = tempfile::tempfile().unwrap();
626        let client: Arc<FileClient<Block>> = Arc::new(
627            FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
628                HashMap::from([
629                    (0u64, p0.clone_header()),
630                    (1, p1.clone_header()),
631                    (2, p2.clone_header()),
632                    (3, p3.clone_header()),
633                ]),
634            ),
635        );
636
637        let mut downloader = ReverseHeadersDownloaderBuilder::default()
638            .stream_batch_size(3)
639            .request_limit(3)
640            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
641        downloader.update_local_head(p3.clone());
642        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
643
644        let headers = downloader.next().await.unwrap();
645        assert_eq!(headers, Ok(vec![p0, p1, p2]));
646        assert!(downloader.next().await.is_none());
647        assert!(downloader.next().await.is_none());
648    }
649
650    #[tokio::test]
651    async fn test_download_headers_from_file() {
652        reth_tracing::init_test_tracing();
653
654        // Generate some random blocks
655        let (file, headers, _) = generate_bodies_file(0..=19).await;
656        // now try to read them back
657        let client: Arc<FileClient<Block>> =
658            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
659
660        // construct headers downloader and use first header
661        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
662            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
663        header_downloader.update_local_head(headers.first().unwrap().clone());
664        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
665
666        // get headers first
667        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
668
669        // reverse to make sure it's in the right order before comparing
670        downloaded_headers.reverse();
671
672        // the first header is not included in the response
673        assert_eq!(downloaded_headers, headers[1..]);
674    }
675
676    #[tokio::test]
677    async fn test_download_bodies_from_file() {
678        // Generate some random blocks
679        let factory = create_test_provider_factory();
680        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
681
682        // now try to read them back
683        let client: Arc<FileClient<Block>> =
684            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
685
686        // insert headers in db for the bodies downloader
687        insert_headers(factory.db_ref().db(), &headers);
688
689        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
690            client.clone(),
691            Arc::new(TestConsensus::default()),
692            factory,
693        );
694        downloader.set_download_range(0..=19).expect("failed to set download range");
695
696        assert_matches!(
697            downloader.next().await,
698            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
699        );
700    }
701
702    #[tokio::test]
703    async fn test_chunk_download_headers_from_file() {
704        reth_tracing::init_test_tracing();
705
706        // Generate some random blocks
707        let (file, headers, _) = generate_bodies_file(0..=14).await;
708
709        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
710        // one block will be read
711        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
712        trace!(target: "downloaders::file::test", chunk_byte_len);
713
714        // init reader
715        let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
716
717        let mut downloaded_headers: Vec<SealedHeader> = vec![];
718
719        let mut local_header = headers.first().unwrap().clone();
720
721        // test
722        while let Some(client) =
723            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
724        {
725            let sync_target = client.tip_header().unwrap();
726
727            let sync_target_hash = sync_target.hash();
728
729            // construct headers downloader and use first header
730            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
731                .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
732            header_downloader.update_local_head(local_header.clone());
733            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
734
735            // get headers first
736            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
737
738            // export new local header to outer scope
739            local_header = sync_target;
740
741            // reverse to make sure it's in the right order before comparing
742            downloaded_headers_chunk.reverse();
743            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
744        }
745
746        // the first header is not included in the response
747        assert_eq!(headers[1..], downloaded_headers);
748    }
749}