1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use async_compression::tokio::bufread::GzipDecoder;
5use futures::Future;
6use itertools::Either;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::RequestError,
12 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13 priority::Priority,
14 BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
18use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
19use thiserror::Error;
20use tokio::{
21 fs::File,
22 io::{AsyncReadExt, BufReader},
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::FramedRead;
26use tracing::{debug, trace, warn};
27
28use super::file_codec::BlockFileCodec;
29use crate::receipt_file_client::FromReceiptReader;
30
31pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
35
36#[derive(Debug, Clone)]
48pub struct FileClient<B: Block> {
49 headers: HashMap<BlockNumber, B::Header>,
51
52 hash_to_number: HashMap<BlockHash, BlockNumber>,
54
55 bodies: HashMap<BlockHash, B::Body>,
57}
58
59#[derive(Debug, Error)]
61pub enum FileClientError {
62 #[error(transparent)]
64 Consensus(#[from] ConsensusError),
65
66 #[error(transparent)]
68 Io(#[from] std::io::Error),
69
70 #[error("{0}")]
72 Rlp(alloy_rlp::Error, Vec<u8>),
73
74 #[error("{0}")]
76 Custom(&'static str),
77}
78
79impl From<&'static str> for FileClientError {
80 fn from(value: &'static str) -> Self {
81 Self::Custom(value)
82 }
83}
84
85impl<B: FullBlock> FileClient<B> {
86 pub async fn new<P: AsRef<Path>>(
88 path: P,
89 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
90 ) -> Result<Self, FileClientError> {
91 let file = File::open(path).await?;
92 Self::from_file(file, consensus).await
93 }
94
95 pub(crate) async fn from_file(
97 mut file: File,
98 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
99 ) -> Result<Self, FileClientError> {
100 let metadata = file.metadata().await?;
102 let file_len = metadata.len();
103
104 let mut reader = vec![];
105 file.read_to_end(&mut reader).await?;
106
107 Ok(FileClientBuilder { consensus, parent_header: None }
108 .build(&reader[..], file_len)
109 .await?
110 .file_client)
111 }
112
113 pub fn tip(&self) -> Option<B256> {
115 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
116 }
117
118 pub fn start(&self) -> Option<B256> {
120 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
121 }
122
123 pub fn max_block(&self) -> Option<u64> {
125 self.headers.keys().max().copied()
126 }
127
128 pub fn min_block(&self) -> Option<u64> {
130 self.headers.keys().min().copied()
131 }
132
133 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
136 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
137 }
138
139 pub fn has_canonical_blocks(&self) -> bool {
141 if self.headers.is_empty() {
142 return true
143 }
144 let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
145 nums.sort_unstable();
146 let mut iter = nums.into_iter();
147 let mut lowest = iter.next().expect("not empty");
148 for next in iter {
149 if next != lowest + 1 {
150 return false
151 }
152 lowest = next;
153 }
154 true
155 }
156
157 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
159 self.bodies = bodies;
160 self
161 }
162
163 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
165 self.headers = headers;
166 for (number, header) in &self.headers {
167 self.hash_to_number.insert(header.hash_slow(), *number);
168 }
169 self
170 }
171
172 pub fn headers_len(&self) -> usize {
174 self.headers.len()
175 }
176
177 pub fn bodies_len(&self) -> usize {
179 self.bodies.len()
180 }
181
182 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
184 self.headers.values()
185 }
186
187 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
191 let bodies = &mut self.bodies;
192 let numbers = &self.hash_to_number;
193 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
194 }
195
196 pub fn total_transactions(&self) -> usize {
198 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
199 }
200}
201
202struct FileClientBuilder<B: Block> {
203 pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
204 pub parent_header: Option<SealedHeader<B::Header>>,
205}
206
207impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
208 for FileClientBuilder<B>
209{
210 type Error = FileClientError;
211 type Output = FileClient<B>;
212
213 fn build<R>(
215 &self,
216 reader: R,
217 num_bytes: u64,
218 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
219 where
220 R: AsyncReadExt + Unpin,
221 {
222 let mut headers = HashMap::default();
223 let mut hash_to_number = HashMap::default();
224 let mut bodies = HashMap::default();
225
226 let mut stream =
228 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
229
230 trace!(target: "downloaders::file",
231 target_num_bytes=num_bytes,
232 capacity=stream.read_buffer().capacity(),
233 "init decode stream"
234 );
235
236 let mut remaining_bytes = vec![];
237
238 let mut log_interval = 0;
239 let mut log_interval_start_block = 0;
240
241 let mut parent_header = self.parent_header.clone();
242
243 async move {
244 while let Some(block_res) = stream.next().await {
245 let block = match block_res {
246 Ok(block) => block,
247 Err(FileClientError::Rlp(err, bytes)) => {
248 trace!(target: "downloaders::file",
249 %err,
250 bytes_len=bytes.len(),
251 "partial block returned from decoding chunk"
252 );
253 remaining_bytes = bytes;
254 break
255 }
256 Err(err) => return Err(err),
257 };
258
259 let block = SealedBlock::seal_slow(block);
260
261 self.consensus.validate_header(block.sealed_header())?;
263 if let Some(parent) = &parent_header {
264 self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
265 parent_header = Some(block.sealed_header().clone());
266 }
267
268 self.consensus.validate_block_pre_execution(&block)?;
270
271 let block_hash = block.hash();
273 let block_number = block.number();
274 let (header, body) = block.split_sealed_header_body();
275 headers.insert(block_number, header.unseal());
276 hash_to_number.insert(block_hash, block_number);
277 bodies.insert(block_hash, body);
278
279 if log_interval == 0 {
280 trace!(target: "downloaders::file",
281 block_number,
282 "read first block"
283 );
284 log_interval_start_block = block_number;
285 } else if log_interval % 100_000 == 0 {
286 trace!(target: "downloaders::file",
287 blocks=?log_interval_start_block..=block_number,
288 "read blocks from file"
289 );
290 log_interval_start_block = block_number + 1;
291 }
292 log_interval += 1;
293 }
294
295 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
296
297 Ok(DecodedFileChunk {
298 file_client: FileClient { headers, hash_to_number, bodies },
299 remaining_bytes,
300 highest_block: None,
301 })
302 }
303 }
304}
305
306impl<B: FullBlock> HeadersClient for FileClient<B> {
307 type Header = B::Header;
308 type Output = HeadersFut<B::Header>;
309
310 fn get_headers_with_priority(
311 &self,
312 request: HeadersRequest,
313 _priority: Priority,
314 ) -> Self::Output {
315 let mut headers = Vec::new();
317 trace!(target: "downloaders::file", request=?request, "Getting headers");
318
319 let start_num = match request.start {
320 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
321 Some(num) => *num,
322 None => {
323 warn!(%hash, "Could not find starting block number for requested header hash");
324 return Box::pin(async move { Err(RequestError::BadResponse) })
325 }
326 },
327 BlockHashOrNumber::Number(num) => num,
328 };
329
330 let range = if request.limit == 1 {
331 Either::Left(start_num..start_num + 1)
332 } else {
333 match request.direction {
334 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
335 HeadersDirection::Falling => {
336 Either::Right((start_num - request.limit + 1..=start_num).rev())
337 }
338 }
339 };
340
341 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
342
343 for block_number in range {
344 match self.headers.get(&block_number).cloned() {
345 Some(header) => headers.push(header),
346 None => {
347 warn!(number=%block_number, "Could not find header");
348 return Box::pin(async move { Err(RequestError::BadResponse) })
349 }
350 }
351 }
352
353 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
354 }
355}
356
357impl<B: FullBlock> BodiesClient for FileClient<B> {
358 type Body = B::Body;
359 type Output = BodiesFut<B::Body>;
360
361 fn get_block_bodies_with_priority_and_range_hint(
362 &self,
363 hashes: Vec<B256>,
364 _priority: Priority,
365 _range_hint: Option<RangeInclusive<u64>>,
366 ) -> Self::Output {
367 let mut bodies = Vec::new();
369
370 for hash in hashes {
373 match self.bodies.get(&hash).cloned() {
374 Some(body) => bodies.push(body),
375 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
376 }
377 }
378
379 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
380 }
381}
382
383impl<B: FullBlock> DownloadClient for FileClient<B> {
384 fn report_bad_message(&self, _peer_id: PeerId) {
385 trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
386 }
388
389 fn num_connected_peers(&self) -> usize {
390 1
392 }
393}
394
395impl<B: FullBlock> BlockClient for FileClient<B> {
396 type Block = B;
397}
398
399#[derive(Debug)]
401enum FileReader {
402 Plain { file: File, remaining_bytes: u64 },
404 Gzip(GzipDecoder<BufReader<File>>),
406}
407
408impl FileReader {
409 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
411 match self {
412 Self::Plain { file, .. } => file.read(buf).await,
413 Self::Gzip(decoder) => decoder.read(buf).await,
414 }
415 }
416
417 async fn read_next_chunk(
420 &mut self,
421 chunk: &mut Vec<u8>,
422 chunk_byte_len: u64,
423 ) -> Result<Option<u64>, FileClientError> {
424 match self {
425 Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
426 Self::Gzip(_) => {
427 Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
428 .then_some(chunk.len() as u64))
429 }
430 }
431 }
432
433 async fn read_plain_chunk(
434 &mut self,
435 chunk: &mut Vec<u8>,
436 chunk_byte_len: u64,
437 ) -> Result<Option<u64>, FileClientError> {
438 let Self::Plain { file, remaining_bytes } = self else {
439 unreachable!("read_plain_chunk should only be called on Plain variant")
440 };
441
442 if *remaining_bytes == 0 && chunk.is_empty() {
443 return Ok(None)
445 }
446
447 let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
448 let old_bytes_len = chunk.len() as u64;
449
450 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
452
453 let prev_read_bytes_len = chunk.len();
455 chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
456 let reader = &mut chunk[prev_read_bytes_len..];
457
458 let new_read_bytes_len = file.read_exact(reader).await? as u64;
460 let next_chunk_byte_len = chunk.len();
461
462 *remaining_bytes -= new_read_bytes_len;
464
465 debug!(target: "downloaders::file",
466 max_chunk_byte_len=chunk_byte_len,
467 prev_read_bytes_len,
468 new_read_bytes_target_len,
469 new_read_bytes_len,
470 next_chunk_byte_len,
471 remaining_file_byte_len=*remaining_bytes,
472 "new bytes were read from file"
473 );
474
475 Ok(Some(next_chunk_byte_len as u64))
476 }
477
478 async fn read_gzip_chunk(
480 &mut self,
481 chunk: &mut Vec<u8>,
482 chunk_byte_len: u64,
483 ) -> Result<bool, FileClientError> {
484 let mut buffer = vec![0u8; 64 * 1024];
485 loop {
486 if chunk.len() >= chunk_byte_len as usize {
487 return Ok(true)
488 }
489
490 match self.read(&mut buffer).await {
491 Ok(0) => return Ok(!chunk.is_empty()),
492 Ok(n) => {
493 chunk.extend_from_slice(&buffer[..n]);
494 }
495 Err(e) => return Err(e.into()),
496 }
497 }
498 }
499}
500
501#[derive(Debug)]
503pub struct ChunkedFileReader {
504 file: FileReader,
506 chunk: Vec<u8>,
508 chunk_byte_len: u64,
510 highest_block: Option<u64>,
513}
514
515impl ChunkedFileReader {
516 pub async fn new<P: AsRef<Path>>(
520 path: P,
521 chunk_byte_len: Option<u64>,
522 ) -> Result<Self, FileClientError> {
523 let path = path.as_ref();
524 let file = File::open(path).await?;
525 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
526
527 Self::from_file(
528 file,
529 chunk_byte_len,
530 path.extension()
531 .and_then(|ext| ext.to_str())
532 .is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
533 )
534 .await
535 }
536
537 pub async fn from_file(
539 file: File,
540 chunk_byte_len: u64,
541 is_gzip: bool,
542 ) -> Result<Self, FileClientError> {
543 let file_reader = if is_gzip {
544 FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
545 } else {
546 let remaining_bytes = file.metadata().await?.len();
547 FileReader::Plain { file, remaining_bytes }
548 };
549
550 Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
551 }
552
553 async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
556 self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
557 }
558
559 pub async fn next_chunk<B: FullBlock>(
564 &mut self,
565 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
566 parent_header: Option<SealedHeader<B::Header>>,
567 ) -> Result<Option<FileClient<B>>, FileClientError> {
568 let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
569
570 let DecodedFileChunk { file_client, remaining_bytes, .. } =
572 FileClientBuilder { consensus, parent_header }
573 .build(&self.chunk[..], chunk_len)
574 .await?;
575
576 self.chunk = remaining_bytes;
578
579 Ok(Some(file_client))
580 }
581
582 pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
584 where
585 T: FromReceiptReader,
586 {
587 let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
588 T::Error::from(match e {
589 FileClientError::Io(io_err) => io_err,
590 _ => io::Error::other(e.to_string()),
591 })
592 })?
593 else {
594 return Ok(None)
595 };
596
597 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
599 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
600 .await?;
601
602 self.chunk = remaining_bytes;
604 self.highest_block = highest_block;
606
607 Ok(Some(file_client))
608 }
609}
610
611pub trait FromReader {
613 type Error: From<io::Error>;
615
616 type Output;
618
619 fn build<R>(
621 &self,
622 reader: R,
623 num_bytes: u64,
624 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
625 where
626 Self: Sized,
627 R: AsyncReadExt + Unpin;
628}
629
630#[derive(Debug)]
632pub struct DecodedFileChunk<T> {
633 pub file_client: T,
635 pub remaining_bytes: Vec<u8>,
637 pub highest_block: Option<u64>,
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645 use crate::{
646 bodies::{
647 bodies::BodiesDownloaderBuilder,
648 test_utils::{insert_headers, zip_blocks},
649 },
650 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
651 test_utils::{generate_bodies, generate_bodies_file},
652 };
653 use assert_matches::assert_matches;
654 use async_compression::tokio::write::GzipEncoder;
655 use futures_util::stream::StreamExt;
656 use rand::Rng;
657 use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
658 use reth_ethereum_primitives::Block;
659 use reth_network_p2p::{
660 bodies::downloader::BodyDownloader,
661 headers::downloader::{HeaderDownloader, SyncTarget},
662 };
663 use reth_provider::test_utils::create_test_provider_factory;
664 use std::sync::Arc;
665 use tokio::{
666 fs::File,
667 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
668 };
669
670 #[tokio::test]
671 async fn streams_bodies_from_buffer() {
672 let factory = create_test_provider_factory();
674 let (headers, mut bodies) = generate_bodies(0..=19);
675
676 insert_headers(&factory, &headers);
677
678 let file = tempfile::tempfile().unwrap();
680
681 let client: Arc<FileClient<Block>> = Arc::new(
682 FileClient::from_file(file.into(), NoopConsensus::arc())
683 .await
684 .unwrap()
685 .with_bodies(bodies.clone()),
686 );
687 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
688 client.clone(),
689 Arc::new(TestConsensus::default()),
690 factory,
691 );
692 downloader.set_download_range(0..=19).expect("failed to set download range");
693
694 assert_matches!(
695 downloader.next().await,
696 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
697 );
698 }
699
700 #[tokio::test]
701 async fn download_headers_at_fork_head() {
702 reth_tracing::init_test_tracing();
703
704 let p3 = SealedHeader::default();
705 let p2 = child_header(&p3);
706 let p1 = child_header(&p2);
707 let p0 = child_header(&p1);
708
709 let file = tempfile::tempfile().unwrap();
710 let client: Arc<FileClient<Block>> = Arc::new(
711 FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
712 HashMap::from([
713 (0u64, p0.clone_header()),
714 (1, p1.clone_header()),
715 (2, p2.clone_header()),
716 (3, p3.clone_header()),
717 ]),
718 ),
719 );
720
721 let mut downloader = ReverseHeadersDownloaderBuilder::default()
722 .stream_batch_size(3)
723 .request_limit(3)
724 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
725 downloader.update_local_head(p3.clone());
726 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
727
728 let headers = downloader.next().await.unwrap();
729 assert_eq!(headers, Ok(vec![p0, p1, p2]));
730 assert!(downloader.next().await.is_none());
731 assert!(downloader.next().await.is_none());
732 }
733
734 #[tokio::test]
735 async fn test_download_headers_from_file() {
736 reth_tracing::init_test_tracing();
737
738 let (file, headers, _) = generate_bodies_file(0..=19).await;
740 let client: Arc<FileClient<Block>> =
742 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
743
744 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
746 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
747 header_downloader.update_local_head(headers.first().unwrap().clone());
748 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
749
750 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
752
753 downloaded_headers.reverse();
755
756 assert_eq!(downloaded_headers, headers[1..]);
758 }
759
760 #[tokio::test]
761 async fn test_download_bodies_from_file() {
762 let factory = create_test_provider_factory();
764 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
765
766 let client: Arc<FileClient<Block>> =
768 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
769
770 insert_headers(&factory, &headers);
772
773 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
774 client.clone(),
775 Arc::new(TestConsensus::default()),
776 factory,
777 );
778 downloader.set_download_range(0..=19).expect("failed to set download range");
779
780 assert_matches!(
781 downloader.next().await,
782 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
783 );
784 }
785
786 #[tokio::test]
787 async fn test_chunk_download_headers_from_file() {
788 reth_tracing::init_test_tracing();
789
790 let (file, headers, _) = generate_bodies_file(0..=14).await;
792
793 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
796 trace!(target: "downloaders::file::test", chunk_byte_len);
797
798 let mut reader =
800 ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
801
802 let mut downloaded_headers: Vec<SealedHeader> = vec![];
803
804 let mut local_header = headers.first().unwrap().clone();
805
806 while let Some(client) =
808 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
809 {
810 let sync_target = client.tip_header().unwrap();
811
812 let sync_target_hash = sync_target.hash();
813
814 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
816 .build(Arc::new(client), Arc::new(TestConsensus::default()));
817 header_downloader.update_local_head(local_header.clone());
818 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
819
820 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
822
823 local_header = sync_target;
825
826 downloaded_headers_chunk.reverse();
828 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
829 }
830
831 assert_eq!(headers[1..], downloaded_headers);
833 }
834
835 #[tokio::test]
836 async fn test_chunk_download_headers_from_gzip_file() {
837 reth_tracing::init_test_tracing();
838
839 let (file, headers, _) = generate_bodies_file(0..=14).await;
841
842 let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
844 let gzip_path = gzip_temp_file.path().to_owned();
845 drop(gzip_temp_file); let mut original_file = file;
849 original_file.seek(SeekFrom::Start(0)).await.unwrap();
850 let mut original_content = Vec::new();
851 original_file.read_to_end(&mut original_content).await.unwrap();
852
853 let mut gzip_file = File::create(&gzip_path).await.unwrap();
854 let mut encoder = GzipEncoder::new(&mut gzip_file);
855
856 encoder.write_all(&original_content).await.unwrap();
858 encoder.shutdown().await.unwrap();
859 drop(gzip_file);
860
861 let gzip_file = File::open(&gzip_path).await.unwrap();
863
864 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
867 trace!(target: "downloaders::file::test", chunk_byte_len);
868
869 let mut reader =
871 ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
872
873 let mut downloaded_headers: Vec<SealedHeader> = vec![];
874
875 let mut local_header = headers.first().unwrap().clone();
876
877 while let Some(client) =
879 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
880 {
881 if client.headers_len() == 0 {
882 continue;
883 }
884
885 let sync_target = client.tip_header().expect("tip_header should not be None");
886
887 let sync_target_hash = sync_target.hash();
888
889 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
891 .build(Arc::new(client), Arc::new(TestConsensus::default()));
892 header_downloader.update_local_head(local_header.clone());
893 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
894
895 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
897
898 local_header = sync_target;
900
901 downloaded_headers_chunk.reverse();
903 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
904 }
905
906 assert_eq!(headers[1..], downloaded_headers);
908 }
909}