reth_downloaders/
file_client.rs

1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use futures::Future;
5use itertools::Either;
6use reth_consensus::{Consensus, ConsensusError};
7use reth_network_p2p::{
8    bodies::client::{BodiesClient, BodiesFut},
9    download::DownloadClient,
10    error::RequestError,
11    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
12    priority::Priority,
13    BlockClient,
14};
15use reth_network_peers::PeerId;
16use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
17use std::{collections::HashMap, io, path::Path, sync::Arc};
18use thiserror::Error;
19use tokio::{fs::File, io::AsyncReadExt};
20use tokio_stream::StreamExt;
21use tokio_util::codec::FramedRead;
22use tracing::{debug, trace, warn};
23
24use super::file_codec::BlockFileCodec;
25use crate::receipt_file_client::FromReceiptReader;
26
27/// Default byte length of chunk to read from chain file.
28///
29/// Default is 1 GB.
30pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
31
32/// Front-end API for fetching chain data from a file.
33///
34/// Blocks are assumed to be written one after another in a file, as rlp bytes.
35///
36/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
37/// rlp(block1) || rlp(block2) || rlp(block3)
38///
39/// Blocks are assumed to have populated transactions, so reading headers will also buffer
40/// transactions in memory for use in the bodies stage.
41///
42/// This reads the entire file into memory, so it is not suitable for large files.
43#[derive(Debug, Clone)]
44pub struct FileClient<B: Block> {
45    /// The buffered headers retrieved when fetching new bodies.
46    headers: HashMap<BlockNumber, B::Header>,
47
48    /// A mapping between block hash and number.
49    hash_to_number: HashMap<BlockHash, BlockNumber>,
50
51    /// The buffered bodies retrieved when fetching new headers.
52    bodies: HashMap<BlockHash, B::Body>,
53}
54
55/// An error that can occur when constructing and using a [`FileClient`].
56#[derive(Debug, Error)]
57pub enum FileClientError {
58    /// An error occurred when validating a header from file.
59    #[error(transparent)]
60    Consensus(#[from] ConsensusError),
61
62    /// An error occurred when opening or reading the file.
63    #[error(transparent)]
64    Io(#[from] std::io::Error),
65
66    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
67    #[error("{0}")]
68    Rlp(alloy_rlp::Error, Vec<u8>),
69
70    /// Custom error message.
71    #[error("{0}")]
72    Custom(&'static str),
73}
74
75impl From<&'static str> for FileClientError {
76    fn from(value: &'static str) -> Self {
77        Self::Custom(value)
78    }
79}
80
81impl<B: FullBlock> FileClient<B> {
82    /// Create a new file client from a file path.
83    pub async fn new<P: AsRef<Path>>(
84        path: P,
85        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
86    ) -> Result<Self, FileClientError> {
87        let file = File::open(path).await?;
88        Self::from_file(file, consensus).await
89    }
90
91    /// Initialize the [`FileClient`] with a file directly.
92    pub(crate) async fn from_file(
93        mut file: File,
94        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
95    ) -> Result<Self, FileClientError> {
96        // get file len from metadata before reading
97        let metadata = file.metadata().await?;
98        let file_len = metadata.len();
99
100        let mut reader = vec![];
101        file.read_to_end(&mut reader).await?;
102
103        Ok(FileClientBuilder { consensus, parent_header: None }
104            .build(&reader[..], file_len)
105            .await?
106            .file_client)
107    }
108
109    /// Get the tip hash of the chain.
110    pub fn tip(&self) -> Option<B256> {
111        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
112    }
113
114    /// Get the start hash of the chain.
115    pub fn start(&self) -> Option<B256> {
116        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
117    }
118
119    /// Returns the highest block number of this client has or `None` if empty
120    pub fn max_block(&self) -> Option<u64> {
121        self.headers.keys().max().copied()
122    }
123
124    /// Returns the lowest block number of this client has or `None` if empty
125    pub fn min_block(&self) -> Option<u64> {
126        self.headers.keys().min().copied()
127    }
128
129    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
130    /// before returning.
131    pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
132        self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
133    }
134
135    /// Returns true if all blocks are canonical (no gaps)
136    pub fn has_canonical_blocks(&self) -> bool {
137        if self.headers.is_empty() {
138            return true
139        }
140        let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
141        nums.sort_unstable();
142        let mut iter = nums.into_iter();
143        let mut lowest = iter.next().expect("not empty");
144        for next in iter {
145            if next != lowest + 1 {
146                return false
147            }
148            lowest = next;
149        }
150        true
151    }
152
153    /// Use the provided bodies as the file client's block body buffer.
154    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
155        self.bodies = bodies;
156        self
157    }
158
159    /// Use the provided headers as the file client's block body buffer.
160    pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
161        self.headers = headers;
162        for (number, header) in &self.headers {
163            self.hash_to_number.insert(header.hash_slow(), *number);
164        }
165        self
166    }
167
168    /// Returns the current number of headers in the client.
169    pub fn headers_len(&self) -> usize {
170        self.headers.len()
171    }
172
173    /// Returns the current number of bodies in the client.
174    pub fn bodies_len(&self) -> usize {
175        self.bodies.len()
176    }
177
178    /// Returns an iterator over headers in the client.
179    pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
180        self.headers.values()
181    }
182
183    /// Returns a mutable iterator over bodies in the client.
184    ///
185    /// Panics, if file client headers and bodies are not mapping 1-1.
186    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
187        let bodies = &mut self.bodies;
188        let numbers = &self.hash_to_number;
189        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
190    }
191
192    /// Returns the current number of transactions in the client.
193    pub fn total_transactions(&self) -> usize {
194        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
195    }
196}
197
198struct FileClientBuilder<B: Block> {
199    pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
200    pub parent_header: Option<SealedHeader<B::Header>>,
201}
202
203impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
204    for FileClientBuilder<B>
205{
206    type Error = FileClientError;
207    type Output = FileClient<B>;
208
209    /// Initialize the [`FileClient`] from bytes that have been read from file.
210    fn build<R>(
211        &self,
212        reader: R,
213        num_bytes: u64,
214    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
215    where
216        R: AsyncReadExt + Unpin,
217    {
218        let mut headers = HashMap::default();
219        let mut hash_to_number = HashMap::default();
220        let mut bodies = HashMap::default();
221
222        // use with_capacity to make sure the internal buffer contains the entire chunk
223        let mut stream =
224            FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
225
226        trace!(target: "downloaders::file",
227            target_num_bytes=num_bytes,
228            capacity=stream.read_buffer().capacity(),
229            "init decode stream"
230        );
231
232        let mut remaining_bytes = vec![];
233
234        let mut log_interval = 0;
235        let mut log_interval_start_block = 0;
236
237        let mut parent_header = self.parent_header.clone();
238
239        async move {
240            while let Some(block_res) = stream.next().await {
241                let block = match block_res {
242                    Ok(block) => block,
243                    Err(FileClientError::Rlp(err, bytes)) => {
244                        trace!(target: "downloaders::file",
245                            %err,
246                            bytes_len=bytes.len(),
247                            "partial block returned from decoding chunk"
248                        );
249                        remaining_bytes = bytes;
250                        break
251                    }
252                    Err(err) => return Err(err),
253                };
254
255                let block = SealedBlock::seal_slow(block);
256
257                // Validate standalone header
258                self.consensus.validate_header(block.sealed_header())?;
259                if let Some(parent) = &parent_header {
260                    self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
261                    parent_header = Some(block.sealed_header().clone());
262                }
263
264                // Validate block against header
265                self.consensus.validate_block_pre_execution(&block)?;
266
267                // add to the internal maps
268                let block_hash = block.hash();
269                let block_number = block.number();
270                let (header, body) = block.split_sealed_header_body();
271                headers.insert(block_number, header.unseal());
272                hash_to_number.insert(block_hash, block_number);
273                bodies.insert(block_hash, body);
274
275                if log_interval == 0 {
276                    trace!(target: "downloaders::file",
277                        block_number,
278                        "read first block"
279                    );
280                    log_interval_start_block = block_number;
281                } else if log_interval % 100_000 == 0 {
282                    trace!(target: "downloaders::file",
283                        blocks=?log_interval_start_block..=block_number,
284                        "read blocks from file"
285                    );
286                    log_interval_start_block = block_number + 1;
287                }
288                log_interval += 1;
289            }
290
291            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
292
293            Ok(DecodedFileChunk {
294                file_client: FileClient { headers, hash_to_number, bodies },
295                remaining_bytes,
296                highest_block: None,
297            })
298        }
299    }
300}
301
302impl<B: FullBlock> HeadersClient for FileClient<B> {
303    type Header = B::Header;
304    type Output = HeadersFut<B::Header>;
305
306    fn get_headers_with_priority(
307        &self,
308        request: HeadersRequest,
309        _priority: Priority,
310    ) -> Self::Output {
311        // this just searches the buffer, and fails if it can't find the header
312        let mut headers = Vec::new();
313        trace!(target: "downloaders::file", request=?request, "Getting headers");
314
315        let start_num = match request.start {
316            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
317                Some(num) => *num,
318                None => {
319                    warn!(%hash, "Could not find starting block number for requested header hash");
320                    return Box::pin(async move { Err(RequestError::BadResponse) })
321                }
322            },
323            BlockHashOrNumber::Number(num) => num,
324        };
325
326        let range = if request.limit == 1 {
327            Either::Left(start_num..start_num + 1)
328        } else {
329            match request.direction {
330                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
331                HeadersDirection::Falling => {
332                    Either::Right((start_num - request.limit + 1..=start_num).rev())
333                }
334            }
335        };
336
337        trace!(target: "downloaders::file", range=?range, "Getting headers with range");
338
339        for block_number in range {
340            match self.headers.get(&block_number).cloned() {
341                Some(header) => headers.push(header),
342                None => {
343                    warn!(number=%block_number, "Could not find header");
344                    return Box::pin(async move { Err(RequestError::BadResponse) })
345                }
346            }
347        }
348
349        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
350    }
351}
352
353impl<B: FullBlock> BodiesClient for FileClient<B> {
354    type Body = B::Body;
355    type Output = BodiesFut<B::Body>;
356
357    fn get_block_bodies_with_priority(
358        &self,
359        hashes: Vec<B256>,
360        _priority: Priority,
361    ) -> Self::Output {
362        // this just searches the buffer, and fails if it can't find the block
363        let mut bodies = Vec::new();
364
365        // check if any are an error
366        // could unwrap here
367        for hash in hashes {
368            match self.bodies.get(&hash).cloned() {
369                Some(body) => bodies.push(body),
370                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
371            }
372        }
373
374        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
375    }
376}
377
378impl<B: FullBlock> DownloadClient for FileClient<B> {
379    fn report_bad_message(&self, _peer_id: PeerId) {
380        trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
381        // noop
382    }
383
384    fn num_connected_peers(&self) -> usize {
385        // no such thing as connected peers when we are just using a file
386        1
387    }
388}
389
390impl<B: FullBlock> BlockClient for FileClient<B> {
391    type Block = B;
392}
393
394/// Chunks file into several [`FileClient`]s.
395#[derive(Debug)]
396pub struct ChunkedFileReader {
397    /// File to read from.
398    file: File,
399    /// Current file byte length.
400    file_byte_len: u64,
401    /// Bytes that have been read.
402    chunk: Vec<u8>,
403    /// Max bytes per chunk.
404    chunk_byte_len: u64,
405    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
406    /// with block number
407    highest_block: Option<u64>,
408}
409
410impl ChunkedFileReader {
411    /// Returns the remaining file length.
412    pub const fn file_len(&self) -> u64 {
413        self.file_byte_len
414    }
415
416    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
417    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
418    pub async fn new<P: AsRef<Path>>(
419        path: P,
420        chunk_byte_len: Option<u64>,
421    ) -> Result<Self, FileClientError> {
422        let file = File::open(path).await?;
423        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
424
425        Self::from_file(file, chunk_byte_len).await
426    }
427
428    /// Opens the file to import from given path. Returns a new instance.
429    pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
430        // get file len from metadata before reading
431        let metadata = file.metadata().await?;
432        let file_byte_len = metadata.len();
433
434        Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
435    }
436
437    /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
438    /// length and the remaining file length.
439    fn chunk_len(&self) -> u64 {
440        let Self { chunk_byte_len, file_byte_len, .. } = *self;
441        let file_byte_len = file_byte_len + self.chunk.len() as u64;
442
443        if chunk_byte_len > file_byte_len {
444            // last chunk
445            file_byte_len
446        } else {
447            chunk_byte_len
448        }
449    }
450
451    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
452    /// chunk to read.
453    async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
454        if self.file_byte_len == 0 && self.chunk.is_empty() {
455            // eof
456            return Ok(None)
457        }
458
459        let chunk_target_len = self.chunk_len();
460        let old_bytes_len = self.chunk.len() as u64;
461
462        // calculate reserved space in chunk
463        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
464
465        // read new bytes from file
466        let prev_read_bytes_len = self.chunk.len();
467        self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
468        let reader = &mut self.chunk[prev_read_bytes_len..];
469
470        // actual bytes that have been read
471        let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
472        let next_chunk_byte_len = self.chunk.len();
473
474        // update remaining file length
475        self.file_byte_len -= new_read_bytes_len;
476
477        debug!(target: "downloaders::file",
478            max_chunk_byte_len=self.chunk_byte_len,
479            prev_read_bytes_len,
480            new_read_bytes_target_len,
481            new_read_bytes_len,
482            next_chunk_byte_len,
483            remaining_file_byte_len=self.file_byte_len,
484            "new bytes were read from file"
485        );
486
487        Ok(Some(next_chunk_byte_len as u64))
488    }
489
490    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
491    pub async fn next_chunk<B: FullBlock>(
492        &mut self,
493        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
494        parent_header: Option<SealedHeader<B::Header>>,
495    ) -> Result<Option<FileClient<B>>, FileClientError> {
496        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
497
498        // make new file client from chunk
499        let DecodedFileChunk { file_client, remaining_bytes, .. } =
500            FileClientBuilder { consensus, parent_header }
501                .build(&self.chunk[..], next_chunk_byte_len)
502                .await?;
503
504        // save left over bytes
505        self.chunk = remaining_bytes;
506
507        Ok(Some(file_client))
508    }
509
510    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
511    pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
512    where
513        T: FromReceiptReader,
514    {
515        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
516
517        // make new file client from chunk
518        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
519            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
520                .await?;
521
522        // save left over bytes
523        self.chunk = remaining_bytes;
524        // update highest block
525        self.highest_block = highest_block;
526
527        Ok(Some(file_client))
528    }
529}
530
531/// Constructs a file client from a reader.
532pub trait FromReader {
533    /// Error returned by file client type.
534    type Error: From<io::Error>;
535
536    /// Output returned by file client type.
537    type Output;
538
539    /// Returns a file client
540    fn build<R>(
541        &self,
542        reader: R,
543        num_bytes: u64,
544    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
545    where
546        Self: Sized,
547        R: AsyncReadExt + Unpin;
548}
549
550/// Output from decoding a file chunk with [`FromReader::build`].
551#[derive(Debug)]
552pub struct DecodedFileChunk<T> {
553    /// File client, i.e. the decoded part of chunk.
554    pub file_client: T,
555    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
556    pub remaining_bytes: Vec<u8>,
557    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
558    /// block number, like receipts.
559    pub highest_block: Option<u64>,
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use crate::{
566        bodies::{
567            bodies::BodiesDownloaderBuilder,
568            test_utils::{insert_headers, zip_blocks},
569        },
570        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
571        test_utils::{generate_bodies, generate_bodies_file},
572    };
573    use assert_matches::assert_matches;
574    use futures_util::stream::StreamExt;
575    use rand::Rng;
576    use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
577    use reth_ethereum_primitives::Block;
578    use reth_network_p2p::{
579        bodies::downloader::BodyDownloader,
580        headers::downloader::{HeaderDownloader, SyncTarget},
581    };
582    use reth_provider::test_utils::create_test_provider_factory;
583    use std::sync::Arc;
584
585    #[tokio::test]
586    async fn streams_bodies_from_buffer() {
587        // Generate some random blocks
588        let factory = create_test_provider_factory();
589        let (headers, mut bodies) = generate_bodies(0..=19);
590
591        insert_headers(factory.db_ref().db(), &headers);
592
593        // create an empty file
594        let file = tempfile::tempfile().unwrap();
595
596        let client: Arc<FileClient<Block>> = Arc::new(
597            FileClient::from_file(file.into(), NoopConsensus::arc())
598                .await
599                .unwrap()
600                .with_bodies(bodies.clone()),
601        );
602        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
603            client.clone(),
604            Arc::new(TestConsensus::default()),
605            factory,
606        );
607        downloader.set_download_range(0..=19).expect("failed to set download range");
608
609        assert_matches!(
610            downloader.next().await,
611            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
612        );
613    }
614
615    #[tokio::test]
616    async fn download_headers_at_fork_head() {
617        reth_tracing::init_test_tracing();
618
619        let p3 = SealedHeader::default();
620        let p2 = child_header(&p3);
621        let p1 = child_header(&p2);
622        let p0 = child_header(&p1);
623
624        let file = tempfile::tempfile().unwrap();
625        let client: Arc<FileClient<Block>> = Arc::new(
626            FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
627                HashMap::from([
628                    (0u64, p0.clone_header()),
629                    (1, p1.clone_header()),
630                    (2, p2.clone_header()),
631                    (3, p3.clone_header()),
632                ]),
633            ),
634        );
635
636        let mut downloader = ReverseHeadersDownloaderBuilder::default()
637            .stream_batch_size(3)
638            .request_limit(3)
639            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
640        downloader.update_local_head(p3.clone());
641        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
642
643        let headers = downloader.next().await.unwrap();
644        assert_eq!(headers, Ok(vec![p0, p1, p2]));
645        assert!(downloader.next().await.is_none());
646        assert!(downloader.next().await.is_none());
647    }
648
649    #[tokio::test]
650    async fn test_download_headers_from_file() {
651        reth_tracing::init_test_tracing();
652
653        // Generate some random blocks
654        let (file, headers, _) = generate_bodies_file(0..=19).await;
655        // now try to read them back
656        let client: Arc<FileClient<Block>> =
657            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
658
659        // construct headers downloader and use first header
660        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
661            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
662        header_downloader.update_local_head(headers.first().unwrap().clone());
663        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
664
665        // get headers first
666        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
667
668        // reverse to make sure it's in the right order before comparing
669        downloaded_headers.reverse();
670
671        // the first header is not included in the response
672        assert_eq!(downloaded_headers, headers[1..]);
673    }
674
675    #[tokio::test]
676    async fn test_download_bodies_from_file() {
677        // Generate some random blocks
678        let factory = create_test_provider_factory();
679        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
680
681        // now try to read them back
682        let client: Arc<FileClient<Block>> =
683            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
684
685        // insert headers in db for the bodies downloader
686        insert_headers(factory.db_ref().db(), &headers);
687
688        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
689            client.clone(),
690            Arc::new(TestConsensus::default()),
691            factory,
692        );
693        downloader.set_download_range(0..=19).expect("failed to set download range");
694
695        assert_matches!(
696            downloader.next().await,
697            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
698        );
699    }
700
701    #[tokio::test]
702    async fn test_chunk_download_headers_from_file() {
703        reth_tracing::init_test_tracing();
704
705        // Generate some random blocks
706        let (file, headers, _) = generate_bodies_file(0..=14).await;
707
708        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
709        // one block will be read
710        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
711        trace!(target: "downloaders::file::test", chunk_byte_len);
712
713        // init reader
714        let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
715
716        let mut downloaded_headers: Vec<SealedHeader> = vec![];
717
718        let mut local_header = headers.first().unwrap().clone();
719
720        // test
721        while let Some(client) =
722            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
723        {
724            let sync_target = client.tip_header().unwrap();
725
726            let sync_target_hash = sync_target.hash();
727
728            // construct headers downloader and use first header
729            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
730                .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
731            header_downloader.update_local_head(local_header.clone());
732            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
733
734            // get headers first
735            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
736
737            // export new local header to outer scope
738            local_header = sync_target;
739
740            // reverse to make sure it's in the right order before comparing
741            downloaded_headers_chunk.reverse();
742            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
743        }
744
745        // the first header is not included in the response
746        assert_eq!(headers[1..], downloaded_headers);
747    }
748}