1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use async_compression::tokio::bufread::GzipDecoder;
5use futures::Future;
6use itertools::Either;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::RequestError,
12 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13 priority::Priority,
14 BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
18use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
19use thiserror::Error;
20use tokio::{
21 fs::File,
22 io::{AsyncReadExt, BufReader},
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::FramedRead;
26use tracing::{debug, trace, warn};
27
28use super::file_codec::BlockFileCodec;
29use crate::receipt_file_client::FromReceiptReader;
30
31pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
35
36#[derive(Debug, Clone)]
48pub struct FileClient<B: Block> {
49 headers: HashMap<BlockNumber, B::Header>,
51
52 hash_to_number: HashMap<BlockHash, BlockNumber>,
54
55 bodies: HashMap<BlockHash, B::Body>,
57}
58
59#[derive(Debug, Error)]
61pub enum FileClientError {
62 #[error(transparent)]
64 Consensus(#[from] ConsensusError),
65
66 #[error(transparent)]
68 Io(#[from] std::io::Error),
69
70 #[error("{0}")]
72 Rlp(alloy_rlp::Error, Vec<u8>),
73
74 #[error("{0}")]
76 Custom(&'static str),
77}
78
79impl From<&'static str> for FileClientError {
80 fn from(value: &'static str) -> Self {
81 Self::Custom(value)
82 }
83}
84
85impl<B: FullBlock> FileClient<B> {
86 pub async fn new<P: AsRef<Path>>(
88 path: P,
89 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
90 ) -> Result<Self, FileClientError> {
91 let file = File::open(path).await?;
92 Self::from_file(file, consensus).await
93 }
94
95 pub(crate) async fn from_file(
97 mut file: File,
98 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
99 ) -> Result<Self, FileClientError> {
100 let metadata = file.metadata().await?;
102 let file_len = metadata.len();
103
104 let mut reader = vec![];
105 file.read_to_end(&mut reader).await?;
106
107 Ok(FileClientBuilder { consensus, parent_header: None }
108 .build(&reader[..], file_len)
109 .await?
110 .file_client)
111 }
112
113 pub fn tip(&self) -> Option<B256> {
115 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
116 }
117
118 pub fn start(&self) -> Option<B256> {
120 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
121 }
122
123 pub fn max_block(&self) -> Option<u64> {
125 self.headers.keys().max().copied()
126 }
127
128 pub fn min_block(&self) -> Option<u64> {
130 self.headers.keys().min().copied()
131 }
132
133 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
136 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
137 }
138
139 pub fn has_canonical_blocks(&self) -> bool {
141 if self.headers.is_empty() {
142 return true
143 }
144 let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
145 nums.sort_unstable();
146 let mut iter = nums.into_iter();
147 let mut lowest = iter.next().expect("not empty");
148 for next in iter {
149 if next != lowest + 1 {
150 return false
151 }
152 lowest = next;
153 }
154 true
155 }
156
157 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
159 self.bodies = bodies;
160 self
161 }
162
163 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
165 self.headers = headers;
166 for (number, header) in &self.headers {
167 self.hash_to_number.insert(header.hash_slow(), *number);
168 }
169 self
170 }
171
172 pub fn headers_len(&self) -> usize {
174 self.headers.len()
175 }
176
177 pub fn bodies_len(&self) -> usize {
179 self.bodies.len()
180 }
181
182 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
184 self.headers.values()
185 }
186
187 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
191 let bodies = &mut self.bodies;
192 let numbers = &self.hash_to_number;
193 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
194 }
195
196 pub fn total_transactions(&self) -> usize {
198 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
199 }
200}
201
202struct FileClientBuilder<B: Block> {
203 pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
204 pub parent_header: Option<SealedHeader<B::Header>>,
205}
206
207impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
208 for FileClientBuilder<B>
209{
210 type Error = FileClientError;
211 type Output = FileClient<B>;
212
213 fn build<R>(
215 &self,
216 reader: R,
217 num_bytes: u64,
218 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
219 where
220 R: AsyncReadExt + Unpin,
221 {
222 let mut headers = HashMap::default();
223 let mut hash_to_number = HashMap::default();
224 let mut bodies = HashMap::default();
225
226 let mut stream =
228 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
229
230 trace!(target: "downloaders::file",
231 target_num_bytes=num_bytes,
232 capacity=stream.read_buffer().capacity(),
233 "init decode stream"
234 );
235
236 let mut remaining_bytes = vec![];
237
238 let mut log_interval = 0;
239 let mut log_interval_start_block = 0;
240
241 let mut parent_header = self.parent_header.clone();
242
243 async move {
244 while let Some(block_res) = stream.next().await {
245 let block = match block_res {
246 Ok(block) => block,
247 Err(FileClientError::Rlp(err, bytes)) => {
248 trace!(target: "downloaders::file",
249 %err,
250 bytes_len=bytes.len(),
251 "partial block returned from decoding chunk"
252 );
253 remaining_bytes = bytes;
254 break
255 }
256 Err(err) => return Err(err),
257 };
258
259 let block = SealedBlock::seal_slow(block);
260
261 self.consensus.validate_header(block.sealed_header())?;
263 if let Some(parent) = &parent_header {
264 self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
265 parent_header = Some(block.sealed_header().clone());
266 }
267
268 self.consensus.validate_block_pre_execution(&block)?;
270
271 let block_hash = block.hash();
273 let block_number = block.number();
274 let (header, body) = block.split_sealed_header_body();
275 headers.insert(block_number, header.unseal());
276 hash_to_number.insert(block_hash, block_number);
277 bodies.insert(block_hash, body);
278
279 if log_interval == 0 {
280 trace!(target: "downloaders::file",
281 block_number,
282 "read first block"
283 );
284 log_interval_start_block = block_number;
285 } else if log_interval % 100_000 == 0 {
286 trace!(target: "downloaders::file",
287 blocks=?log_interval_start_block..=block_number,
288 "read blocks from file"
289 );
290 log_interval_start_block = block_number + 1;
291 }
292 log_interval += 1;
293 }
294
295 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
296
297 Ok(DecodedFileChunk {
298 file_client: FileClient { headers, hash_to_number, bodies },
299 remaining_bytes,
300 highest_block: None,
301 })
302 }
303 }
304}
305
306impl<B: FullBlock> HeadersClient for FileClient<B> {
307 type Header = B::Header;
308 type Output = HeadersFut<B::Header>;
309
310 fn get_headers_with_priority(
311 &self,
312 request: HeadersRequest,
313 _priority: Priority,
314 ) -> Self::Output {
315 let mut headers = Vec::new();
317 trace!(target: "downloaders::file", request=?request, "Getting headers");
318
319 let start_num = match request.start {
320 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
321 Some(num) => *num,
322 None => {
323 warn!(%hash, "Could not find starting block number for requested header hash");
324 return Box::pin(async move { Err(RequestError::BadResponse) })
325 }
326 },
327 BlockHashOrNumber::Number(num) => num,
328 };
329
330 let range = if request.limit == 1 {
331 Either::Left(start_num..start_num + 1)
332 } else {
333 match request.direction {
334 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
335 HeadersDirection::Falling => {
336 Either::Right((start_num - request.limit + 1..=start_num).rev())
337 }
338 }
339 };
340
341 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
342
343 for block_number in range {
344 match self.headers.get(&block_number).cloned() {
345 Some(header) => headers.push(header),
346 None => {
347 warn!(number=%block_number, "Could not find header");
348 return Box::pin(async move { Err(RequestError::BadResponse) })
349 }
350 }
351 }
352
353 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
354 }
355}
356
357impl<B: FullBlock> BodiesClient for FileClient<B> {
358 type Body = B::Body;
359 type Output = BodiesFut<B::Body>;
360
361 fn get_block_bodies_with_priority_and_range_hint(
362 &self,
363 hashes: Vec<B256>,
364 _priority: Priority,
365 _range_hint: Option<RangeInclusive<u64>>,
366 ) -> Self::Output {
367 let mut bodies = Vec::new();
369
370 for hash in hashes {
373 match self.bodies.get(&hash).cloned() {
374 Some(body) => bodies.push(body),
375 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
376 }
377 }
378
379 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
380 }
381}
382
383impl<B: FullBlock> DownloadClient for FileClient<B> {
384 fn report_bad_message(&self, _peer_id: PeerId) {
385 trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
386 }
388
389 fn num_connected_peers(&self) -> usize {
390 1
392 }
393}
394
395impl<B: FullBlock> BlockClient for FileClient<B> {
396 type Block = B;
397}
398
399#[derive(Debug)]
401enum FileReader {
402 Plain { file: File, remaining_bytes: u64 },
404 Gzip(GzipDecoder<BufReader<File>>),
406}
407
408impl FileReader {
409 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
411 match self {
412 Self::Plain { file, .. } => file.read(buf).await,
413 Self::Gzip(decoder) => decoder.read(buf).await,
414 }
415 }
416
417 async fn read_next_chunk(
420 &mut self,
421 chunk: &mut Vec<u8>,
422 chunk_byte_len: u64,
423 ) -> Result<Option<u64>, FileClientError> {
424 match self {
425 Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
426 Self::Gzip(_) => {
427 Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
428 .then_some(chunk.len() as u64))
429 }
430 }
431 }
432
433 async fn read_plain_chunk(
434 &mut self,
435 chunk: &mut Vec<u8>,
436 chunk_byte_len: u64,
437 ) -> Result<Option<u64>, FileClientError> {
438 let Self::Plain { file, remaining_bytes } = self else {
439 unreachable!("read_plain_chunk should only be called on Plain variant")
440 };
441
442 if *remaining_bytes == 0 && chunk.is_empty() {
443 return Ok(None)
445 }
446
447 let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
448 let old_bytes_len = chunk.len() as u64;
449
450 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
452
453 let prev_read_bytes_len = chunk.len();
455 chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
456 let reader = &mut chunk[prev_read_bytes_len..];
457
458 let new_read_bytes_len = file.read_exact(reader).await? as u64;
460 let next_chunk_byte_len = chunk.len();
461
462 *remaining_bytes -= new_read_bytes_len;
464
465 debug!(target: "downloaders::file",
466 max_chunk_byte_len=chunk_byte_len,
467 prev_read_bytes_len,
468 new_read_bytes_target_len,
469 new_read_bytes_len,
470 next_chunk_byte_len,
471 remaining_file_byte_len=*remaining_bytes,
472 "new bytes were read from file"
473 );
474
475 Ok(Some(next_chunk_byte_len as u64))
476 }
477
478 async fn read_gzip_chunk(
480 &mut self,
481 chunk: &mut Vec<u8>,
482 chunk_byte_len: u64,
483 ) -> Result<bool, FileClientError> {
484 loop {
485 if chunk.len() >= chunk_byte_len as usize {
486 return Ok(true)
487 }
488
489 let mut buffer = vec![0u8; 64 * 1024];
490
491 match self.read(&mut buffer).await {
492 Ok(0) => return Ok(!chunk.is_empty()),
493 Ok(n) => {
494 buffer.truncate(n);
495 chunk.extend_from_slice(&buffer);
496 }
497 Err(e) => return Err(e.into()),
498 }
499 }
500 }
501}
502
503#[derive(Debug)]
505pub struct ChunkedFileReader {
506 file: FileReader,
508 chunk: Vec<u8>,
510 chunk_byte_len: u64,
512 highest_block: Option<u64>,
515}
516
517impl ChunkedFileReader {
518 pub async fn new<P: AsRef<Path>>(
522 path: P,
523 chunk_byte_len: Option<u64>,
524 ) -> Result<Self, FileClientError> {
525 let path = path.as_ref();
526 let file = File::open(path).await?;
527 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
528
529 Self::from_file(
530 file,
531 chunk_byte_len,
532 path.extension()
533 .and_then(|ext| ext.to_str())
534 .is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
535 )
536 .await
537 }
538
539 pub async fn from_file(
541 file: File,
542 chunk_byte_len: u64,
543 is_gzip: bool,
544 ) -> Result<Self, FileClientError> {
545 let file_reader = if is_gzip {
546 FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
547 } else {
548 let remaining_bytes = file.metadata().await?.len();
549 FileReader::Plain { file, remaining_bytes }
550 };
551
552 Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
553 }
554
555 async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
558 self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
559 }
560
561 pub async fn next_chunk<B: FullBlock>(
566 &mut self,
567 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
568 parent_header: Option<SealedHeader<B::Header>>,
569 ) -> Result<Option<FileClient<B>>, FileClientError> {
570 let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
571
572 let DecodedFileChunk { file_client, remaining_bytes, .. } =
574 FileClientBuilder { consensus, parent_header }
575 .build(&self.chunk[..], chunk_len)
576 .await?;
577
578 self.chunk = remaining_bytes;
580
581 Ok(Some(file_client))
582 }
583
584 pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
586 where
587 T: FromReceiptReader,
588 {
589 let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
590 T::Error::from(match e {
591 FileClientError::Io(io_err) => io_err,
592 _ => io::Error::other(e.to_string()),
593 })
594 })?
595 else {
596 return Ok(None)
597 };
598
599 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
601 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
602 .await?;
603
604 self.chunk = remaining_bytes;
606 self.highest_block = highest_block;
608
609 Ok(Some(file_client))
610 }
611}
612
613pub trait FromReader {
615 type Error: From<io::Error>;
617
618 type Output;
620
621 fn build<R>(
623 &self,
624 reader: R,
625 num_bytes: u64,
626 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
627 where
628 Self: Sized,
629 R: AsyncReadExt + Unpin;
630}
631
632#[derive(Debug)]
634pub struct DecodedFileChunk<T> {
635 pub file_client: T,
637 pub remaining_bytes: Vec<u8>,
639 pub highest_block: Option<u64>,
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use crate::{
648 bodies::{
649 bodies::BodiesDownloaderBuilder,
650 test_utils::{insert_headers, zip_blocks},
651 },
652 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
653 test_utils::{generate_bodies, generate_bodies_file},
654 };
655 use assert_matches::assert_matches;
656 use async_compression::tokio::write::GzipEncoder;
657 use futures_util::stream::StreamExt;
658 use rand::Rng;
659 use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
660 use reth_ethereum_primitives::Block;
661 use reth_network_p2p::{
662 bodies::downloader::BodyDownloader,
663 headers::downloader::{HeaderDownloader, SyncTarget},
664 };
665 use reth_provider::test_utils::create_test_provider_factory;
666 use std::sync::Arc;
667 use tokio::{
668 fs::File,
669 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
670 };
671
672 #[tokio::test]
673 async fn streams_bodies_from_buffer() {
674 let factory = create_test_provider_factory();
676 let (headers, mut bodies) = generate_bodies(0..=19);
677
678 insert_headers(&factory, &headers);
679
680 let file = tempfile::tempfile().unwrap();
682
683 let client: Arc<FileClient<Block>> = Arc::new(
684 FileClient::from_file(file.into(), NoopConsensus::arc())
685 .await
686 .unwrap()
687 .with_bodies(bodies.clone()),
688 );
689 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
690 client.clone(),
691 Arc::new(TestConsensus::default()),
692 factory,
693 );
694 downloader.set_download_range(0..=19).expect("failed to set download range");
695
696 assert_matches!(
697 downloader.next().await,
698 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
699 );
700 }
701
702 #[tokio::test]
703 async fn download_headers_at_fork_head() {
704 reth_tracing::init_test_tracing();
705
706 let p3 = SealedHeader::default();
707 let p2 = child_header(&p3);
708 let p1 = child_header(&p2);
709 let p0 = child_header(&p1);
710
711 let file = tempfile::tempfile().unwrap();
712 let client: Arc<FileClient<Block>> = Arc::new(
713 FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
714 HashMap::from([
715 (0u64, p0.clone_header()),
716 (1, p1.clone_header()),
717 (2, p2.clone_header()),
718 (3, p3.clone_header()),
719 ]),
720 ),
721 );
722
723 let mut downloader = ReverseHeadersDownloaderBuilder::default()
724 .stream_batch_size(3)
725 .request_limit(3)
726 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
727 downloader.update_local_head(p3.clone());
728 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
729
730 let headers = downloader.next().await.unwrap();
731 assert_eq!(headers, Ok(vec![p0, p1, p2]));
732 assert!(downloader.next().await.is_none());
733 assert!(downloader.next().await.is_none());
734 }
735
736 #[tokio::test]
737 async fn test_download_headers_from_file() {
738 reth_tracing::init_test_tracing();
739
740 let (file, headers, _) = generate_bodies_file(0..=19).await;
742 let client: Arc<FileClient<Block>> =
744 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
745
746 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
748 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
749 header_downloader.update_local_head(headers.first().unwrap().clone());
750 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
751
752 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
754
755 downloaded_headers.reverse();
757
758 assert_eq!(downloaded_headers, headers[1..]);
760 }
761
762 #[tokio::test]
763 async fn test_download_bodies_from_file() {
764 let factory = create_test_provider_factory();
766 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
767
768 let client: Arc<FileClient<Block>> =
770 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
771
772 insert_headers(&factory, &headers);
774
775 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
776 client.clone(),
777 Arc::new(TestConsensus::default()),
778 factory,
779 );
780 downloader.set_download_range(0..=19).expect("failed to set download range");
781
782 assert_matches!(
783 downloader.next().await,
784 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
785 );
786 }
787
788 #[tokio::test]
789 async fn test_chunk_download_headers_from_file() {
790 reth_tracing::init_test_tracing();
791
792 let (file, headers, _) = generate_bodies_file(0..=14).await;
794
795 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
798 trace!(target: "downloaders::file::test", chunk_byte_len);
799
800 let mut reader =
802 ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
803
804 let mut downloaded_headers: Vec<SealedHeader> = vec![];
805
806 let mut local_header = headers.first().unwrap().clone();
807
808 while let Some(client) =
810 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
811 {
812 let sync_target = client.tip_header().unwrap();
813
814 let sync_target_hash = sync_target.hash();
815
816 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
818 .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
819 header_downloader.update_local_head(local_header.clone());
820 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
821
822 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
824
825 local_header = sync_target;
827
828 downloaded_headers_chunk.reverse();
830 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
831 }
832
833 assert_eq!(headers[1..], downloaded_headers);
835 }
836
837 #[tokio::test]
838 async fn test_chunk_download_headers_from_gzip_file() {
839 reth_tracing::init_test_tracing();
840
841 let (file, headers, _) = generate_bodies_file(0..=14).await;
843
844 let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
846 let gzip_path = gzip_temp_file.path().to_owned();
847 drop(gzip_temp_file); let mut original_file = file;
851 original_file.seek(SeekFrom::Start(0)).await.unwrap();
852 let mut original_content = Vec::new();
853 original_file.read_to_end(&mut original_content).await.unwrap();
854
855 let mut gzip_file = File::create(&gzip_path).await.unwrap();
856 let mut encoder = GzipEncoder::new(&mut gzip_file);
857
858 encoder.write_all(&original_content).await.unwrap();
860 encoder.shutdown().await.unwrap();
861 drop(gzip_file);
862
863 let gzip_file = File::open(&gzip_path).await.unwrap();
865
866 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
869 trace!(target: "downloaders::file::test", chunk_byte_len);
870
871 let mut reader =
873 ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
874
875 let mut downloaded_headers: Vec<SealedHeader> = vec![];
876
877 let mut local_header = headers.first().unwrap().clone();
878
879 while let Some(client) =
881 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
882 {
883 if client.headers_len() == 0 {
884 continue;
885 }
886
887 let sync_target = client.tip_header().expect("tip_header should not be None");
888
889 let sync_target_hash = sync_target.hash();
890
891 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
893 .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
894 header_downloader.update_local_head(local_header.clone());
895 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
896
897 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
899
900 local_header = sync_target;
902
903 downloaded_headers_chunk.reverse();
905 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
906 }
907
908 assert_eq!(headers[1..], downloaded_headers);
910 }
911}