1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use async_compression::tokio::bufread::GzipDecoder;
5use futures::Future;
6use itertools::Either;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::RequestError,
12 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13 priority::Priority,
14 BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
18use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
19use thiserror::Error;
20use tokio::{
21 fs::File,
22 io::{AsyncReadExt, BufReader},
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::FramedRead;
26use tracing::{debug, trace, warn};
27
28use super::file_codec::BlockFileCodec;
29use crate::receipt_file_client::FromReceiptReader;
30
31pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
35
36#[derive(Debug, Clone)]
48pub struct FileClient<B: Block> {
49 headers: HashMap<BlockNumber, B::Header>,
51
52 hash_to_number: HashMap<BlockHash, BlockNumber>,
54
55 bodies: HashMap<BlockHash, B::Body>,
57}
58
59#[derive(Debug, Error)]
61pub enum FileClientError {
62 #[error(transparent)]
64 Consensus(#[from] ConsensusError),
65
66 #[error(transparent)]
68 Io(#[from] std::io::Error),
69
70 #[error("{0}")]
72 Rlp(alloy_rlp::Error, Vec<u8>),
73
74 #[error("{0}")]
76 Custom(&'static str),
77}
78
79impl From<&'static str> for FileClientError {
80 fn from(value: &'static str) -> Self {
81 Self::Custom(value)
82 }
83}
84
85impl<B: FullBlock> FileClient<B> {
86 pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
88 let blocks: Vec<_> = blocks.into_iter().collect();
89 let capacity = blocks.len();
90
91 let mut headers = HashMap::with_capacity(capacity);
92 let mut hash_to_number = HashMap::with_capacity(capacity);
93 let mut bodies = HashMap::with_capacity(capacity);
94
95 for block in blocks {
96 let number = block.number();
97 let hash = block.hash();
98 let (header, body) = block.split_sealed_header_body();
99
100 headers.insert(number, header.into_header());
101 hash_to_number.insert(hash, number);
102 bodies.insert(hash, body);
103 }
104
105 Self { headers, hash_to_number, bodies }
106 }
107
108 pub async fn new<P: AsRef<Path>>(
110 path: P,
111 consensus: Arc<dyn Consensus<B>>,
112 ) -> Result<Self, FileClientError> {
113 let file = File::open(path).await?;
114 Self::from_file(file, consensus).await
115 }
116
117 pub(crate) async fn from_file(
119 mut file: File,
120 consensus: Arc<dyn Consensus<B>>,
121 ) -> Result<Self, FileClientError> {
122 let metadata = file.metadata().await?;
124 let file_len = metadata.len();
125
126 let mut reader = vec![];
127 file.read_to_end(&mut reader).await?;
128
129 Ok(FileClientBuilder { consensus, parent_header: None }
130 .build(&reader[..], file_len)
131 .await?
132 .file_client)
133 }
134
135 pub fn tip(&self) -> Option<B256> {
137 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
138 }
139
140 pub fn start(&self) -> Option<B256> {
142 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
143 }
144
145 pub fn max_block(&self) -> Option<u64> {
147 self.headers.keys().max().copied()
148 }
149
150 pub fn min_block(&self) -> Option<u64> {
152 self.headers.keys().min().copied()
153 }
154
155 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
158 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
159 }
160
161 pub fn has_canonical_blocks(&self) -> bool {
163 if self.headers.is_empty() {
164 return true
165 }
166 let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
167 nums.sort_unstable();
168 let mut iter = nums.into_iter();
169 let mut lowest = iter.next().expect("not empty");
170 for next in iter {
171 if next != lowest + 1 {
172 return false
173 }
174 lowest = next;
175 }
176 true
177 }
178
179 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
181 self.bodies = bodies;
182 self
183 }
184
185 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
187 self.headers = headers;
188 for (number, header) in &self.headers {
189 self.hash_to_number.insert(header.hash_slow(), *number);
190 }
191 self
192 }
193
194 pub fn headers_len(&self) -> usize {
196 self.headers.len()
197 }
198
199 pub fn bodies_len(&self) -> usize {
201 self.bodies.len()
202 }
203
204 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
206 self.headers.values()
207 }
208
209 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
213 let bodies = &mut self.bodies;
214 let numbers = &self.hash_to_number;
215 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
216 }
217
218 pub fn total_transactions(&self) -> usize {
220 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
221 }
222}
223
224struct FileClientBuilder<B: Block> {
225 pub consensus: Arc<dyn Consensus<B>>,
226 pub parent_header: Option<SealedHeader<B::Header>>,
227}
228
229impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
230 for FileClientBuilder<B>
231{
232 type Error = FileClientError;
233 type Output = FileClient<B>;
234
235 fn build<R>(
237 &self,
238 reader: R,
239 num_bytes: u64,
240 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
241 where
242 R: AsyncReadExt + Unpin,
243 {
244 let mut headers = HashMap::default();
245 let mut hash_to_number = HashMap::default();
246 let mut bodies = HashMap::default();
247
248 let mut stream =
250 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
251
252 trace!(target: "downloaders::file",
253 target_num_bytes=num_bytes,
254 capacity=stream.read_buffer().capacity(),
255 "init decode stream"
256 );
257
258 let mut remaining_bytes = vec![];
259
260 let mut log_interval = 0;
261 let mut log_interval_start_block = 0;
262
263 let mut parent_header = self.parent_header.clone();
264
265 async move {
266 while let Some(block_res) = stream.next().await {
267 let block = match block_res {
268 Ok(block) => block,
269 Err(FileClientError::Rlp(err, bytes)) => {
270 trace!(target: "downloaders::file",
271 %err,
272 bytes_len=bytes.len(),
273 "partial block returned from decoding chunk"
274 );
275 remaining_bytes = bytes;
276 break
277 }
278 Err(err) => return Err(err),
279 };
280
281 let block = SealedBlock::seal_slow(block);
282
283 self.consensus.validate_header(block.sealed_header())?;
285 if let Some(parent) = &parent_header {
286 self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
287 parent_header = Some(block.sealed_header().clone());
288 }
289
290 self.consensus.validate_block_pre_execution(&block)?;
292
293 let block_hash = block.hash();
295 let block_number = block.number();
296 let (header, body) = block.split_sealed_header_body();
297 headers.insert(block_number, header.unseal());
298 hash_to_number.insert(block_hash, block_number);
299 bodies.insert(block_hash, body);
300
301 if log_interval == 0 {
302 trace!(target: "downloaders::file",
303 block_number,
304 "read first block"
305 );
306 log_interval_start_block = block_number;
307 } else if log_interval % 100_000 == 0 {
308 trace!(target: "downloaders::file",
309 blocks=?log_interval_start_block..=block_number,
310 "read blocks from file"
311 );
312 log_interval_start_block = block_number + 1;
313 }
314 log_interval += 1;
315 }
316
317 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
318
319 Ok(DecodedFileChunk {
320 file_client: FileClient { headers, hash_to_number, bodies },
321 remaining_bytes,
322 highest_block: None,
323 })
324 }
325 }
326}
327
328impl<B: FullBlock> HeadersClient for FileClient<B> {
329 type Header = B::Header;
330 type Output = HeadersFut<B::Header>;
331
332 fn get_headers_with_priority(
333 &self,
334 request: HeadersRequest,
335 _priority: Priority,
336 ) -> Self::Output {
337 let mut headers = Vec::new();
339 trace!(target: "downloaders::file", request=?request, "Getting headers");
340
341 let start_num = match request.start {
342 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
343 Some(num) => *num,
344 None => {
345 warn!(%hash, "Could not find starting block number for requested header hash");
346 return Box::pin(async move { Err(RequestError::BadResponse) })
347 }
348 },
349 BlockHashOrNumber::Number(num) => num,
350 };
351
352 let range = if request.limit == 1 {
353 Either::Left(start_num..start_num + 1)
354 } else {
355 match request.direction {
356 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
357 HeadersDirection::Falling => {
358 Either::Right((start_num - request.limit + 1..=start_num).rev())
359 }
360 }
361 };
362
363 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
364
365 for block_number in range {
366 match self.headers.get(&block_number).cloned() {
367 Some(header) => headers.push(header),
368 None => {
369 warn!(number=%block_number, "Could not find header");
370 return Box::pin(async move { Err(RequestError::BadResponse) })
371 }
372 }
373 }
374
375 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
376 }
377}
378
379impl<B: FullBlock> BodiesClient for FileClient<B> {
380 type Body = B::Body;
381 type Output = BodiesFut<B::Body>;
382
383 fn get_block_bodies_with_priority_and_range_hint(
384 &self,
385 hashes: Vec<B256>,
386 _priority: Priority,
387 _range_hint: Option<RangeInclusive<u64>>,
388 ) -> Self::Output {
389 let mut bodies = Vec::new();
391
392 for hash in hashes {
395 match self.bodies.get(&hash).cloned() {
396 Some(body) => bodies.push(body),
397 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
398 }
399 }
400
401 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
402 }
403}
404
405impl<B: FullBlock> DownloadClient for FileClient<B> {
406 fn report_bad_message(&self, _peer_id: PeerId) {
407 trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
408 }
410
411 fn num_connected_peers(&self) -> usize {
412 1
414 }
415}
416
417impl<B: FullBlock> BlockClient for FileClient<B> {
418 type Block = B;
419}
420
421#[derive(Debug)]
423enum FileReader {
424 Plain { file: File, remaining_bytes: u64 },
426 Gzip(GzipDecoder<BufReader<File>>),
428}
429
430impl FileReader {
431 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
433 match self {
434 Self::Plain { file, .. } => file.read(buf).await,
435 Self::Gzip(decoder) => decoder.read(buf).await,
436 }
437 }
438
439 async fn read_next_chunk(
442 &mut self,
443 chunk: &mut Vec<u8>,
444 chunk_byte_len: u64,
445 ) -> Result<Option<u64>, FileClientError> {
446 match self {
447 Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
448 Self::Gzip(_) => {
449 Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
450 .then_some(chunk.len() as u64))
451 }
452 }
453 }
454
455 async fn read_plain_chunk(
456 &mut self,
457 chunk: &mut Vec<u8>,
458 chunk_byte_len: u64,
459 ) -> Result<Option<u64>, FileClientError> {
460 let Self::Plain { file, remaining_bytes } = self else {
461 unreachable!("read_plain_chunk should only be called on Plain variant")
462 };
463
464 if *remaining_bytes == 0 && chunk.is_empty() {
465 return Ok(None)
467 }
468
469 let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
470 let old_bytes_len = chunk.len() as u64;
471
472 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
474
475 let prev_read_bytes_len = chunk.len();
477 chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
478 let reader = &mut chunk[prev_read_bytes_len..];
479
480 let new_read_bytes_len = file.read_exact(reader).await? as u64;
482 let next_chunk_byte_len = chunk.len();
483
484 *remaining_bytes -= new_read_bytes_len;
486
487 debug!(target: "downloaders::file",
488 max_chunk_byte_len=chunk_byte_len,
489 prev_read_bytes_len,
490 new_read_bytes_target_len,
491 new_read_bytes_len,
492 next_chunk_byte_len,
493 remaining_file_byte_len=*remaining_bytes,
494 "new bytes were read from file"
495 );
496
497 Ok(Some(next_chunk_byte_len as u64))
498 }
499
500 async fn read_gzip_chunk(
502 &mut self,
503 chunk: &mut Vec<u8>,
504 chunk_byte_len: u64,
505 ) -> Result<bool, FileClientError> {
506 let mut buffer = vec![0u8; 64 * 1024];
507 loop {
508 if chunk.len() >= chunk_byte_len as usize {
509 return Ok(true)
510 }
511
512 match self.read(&mut buffer).await {
513 Ok(0) => return Ok(!chunk.is_empty()),
514 Ok(n) => {
515 chunk.extend_from_slice(&buffer[..n]);
516 }
517 Err(e) => return Err(e.into()),
518 }
519 }
520 }
521}
522
523#[derive(Debug)]
525pub struct ChunkedFileReader {
526 file: FileReader,
528 chunk: Vec<u8>,
530 chunk_byte_len: u64,
532 highest_block: Option<u64>,
535}
536
537impl ChunkedFileReader {
538 pub async fn new<P: AsRef<Path>>(
542 path: P,
543 chunk_byte_len: Option<u64>,
544 ) -> Result<Self, FileClientError> {
545 let path = path.as_ref();
546 let file = File::open(path).await?;
547 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
548
549 Self::from_file(
550 file,
551 chunk_byte_len,
552 path.extension()
553 .and_then(|ext| ext.to_str())
554 .is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
555 )
556 .await
557 }
558
559 pub async fn from_file(
561 file: File,
562 chunk_byte_len: u64,
563 is_gzip: bool,
564 ) -> Result<Self, FileClientError> {
565 let file_reader = if is_gzip {
566 FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
567 } else {
568 let remaining_bytes = file.metadata().await?.len();
569 FileReader::Plain { file, remaining_bytes }
570 };
571
572 Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
573 }
574
575 async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
578 self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
579 }
580
581 pub async fn next_chunk<B: FullBlock>(
586 &mut self,
587 consensus: Arc<dyn Consensus<B>>,
588 parent_header: Option<SealedHeader<B::Header>>,
589 ) -> Result<Option<FileClient<B>>, FileClientError> {
590 let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
591
592 let DecodedFileChunk { file_client, remaining_bytes, .. } =
594 FileClientBuilder { consensus, parent_header }
595 .build(&self.chunk[..], chunk_len)
596 .await?;
597
598 self.chunk = remaining_bytes;
600
601 Ok(Some(file_client))
602 }
603
604 pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
606 where
607 T: FromReceiptReader,
608 {
609 let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
610 T::Error::from(match e {
611 FileClientError::Io(io_err) => io_err,
612 _ => io::Error::other(e.to_string()),
613 })
614 })?
615 else {
616 return Ok(None)
617 };
618
619 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
621 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
622 .await?;
623
624 self.chunk = remaining_bytes;
626 self.highest_block = highest_block;
628
629 Ok(Some(file_client))
630 }
631}
632
633pub trait FromReader {
635 type Error: From<io::Error>;
637
638 type Output;
640
641 fn build<R>(
643 &self,
644 reader: R,
645 num_bytes: u64,
646 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
647 where
648 Self: Sized,
649 R: AsyncReadExt + Unpin;
650}
651
652#[derive(Debug)]
654pub struct DecodedFileChunk<T> {
655 pub file_client: T,
657 pub remaining_bytes: Vec<u8>,
659 pub highest_block: Option<u64>,
662}
663
664#[cfg(test)]
665mod tests {
666 use super::*;
667 use crate::{
668 bodies::{
669 bodies::BodiesDownloaderBuilder,
670 test_utils::{insert_headers, zip_blocks},
671 },
672 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
673 test_utils::{generate_bodies, generate_bodies_file},
674 };
675 use assert_matches::assert_matches;
676 use async_compression::tokio::write::GzipEncoder;
677 use futures_util::stream::StreamExt;
678 use rand::Rng;
679 use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
680 use reth_ethereum_primitives::Block;
681 use reth_network_p2p::{
682 bodies::downloader::BodyDownloader,
683 headers::downloader::{HeaderDownloader, SyncTarget},
684 };
685 use reth_provider::test_utils::create_test_provider_factory;
686 use std::sync::Arc;
687 use tokio::{
688 fs::File,
689 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
690 };
691
692 #[tokio::test]
693 async fn streams_bodies_from_buffer() {
694 let factory = create_test_provider_factory();
696 let (headers, mut bodies) = generate_bodies(0..=19);
697
698 insert_headers(&factory, &headers);
699
700 let file = tempfile::tempfile().unwrap();
702
703 let client: Arc<FileClient<Block>> = Arc::new(
704 FileClient::from_file(file.into(), NoopConsensus::arc())
705 .await
706 .unwrap()
707 .with_bodies(bodies.clone().into_iter().collect()),
708 );
709 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
710 client.clone(),
711 Arc::new(TestConsensus::default()),
712 factory,
713 );
714 downloader.set_download_range(0..=19).expect("failed to set download range");
715
716 assert_matches!(
717 downloader.next().await,
718 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
719 );
720 }
721
722 #[tokio::test]
723 async fn download_headers_at_fork_head() {
724 reth_tracing::init_test_tracing();
725
726 let p3 = SealedHeader::default();
727 let p2 = child_header(&p3);
728 let p1 = child_header(&p2);
729 let p0 = child_header(&p1);
730
731 let file = tempfile::tempfile().unwrap();
732 let client: Arc<FileClient<Block>> = Arc::new(
733 FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
734 HashMap::from([
735 (0u64, p0.clone_header()),
736 (1, p1.clone_header()),
737 (2, p2.clone_header()),
738 (3, p3.clone_header()),
739 ]),
740 ),
741 );
742
743 let mut downloader = ReverseHeadersDownloaderBuilder::default()
744 .stream_batch_size(3)
745 .request_limit(3)
746 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
747 downloader.update_local_head(p3.clone());
748 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
749
750 let headers = downloader.next().await.unwrap();
751 assert_eq!(headers.unwrap(), vec![p0, p1, p2]);
752 assert!(downloader.next().await.is_none());
753 assert!(downloader.next().await.is_none());
754 }
755
756 #[tokio::test]
757 async fn test_download_headers_from_file() {
758 reth_tracing::init_test_tracing();
759
760 let (file, headers, _) = generate_bodies_file(0..=19).await;
762 let client: Arc<FileClient<Block>> =
764 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
765
766 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
768 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
769 header_downloader.update_local_head(headers.first().unwrap().clone());
770 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
771
772 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
774
775 downloaded_headers.reverse();
777
778 assert_eq!(downloaded_headers, headers[1..]);
780 }
781
782 #[tokio::test]
783 async fn test_download_bodies_from_file() {
784 let factory = create_test_provider_factory();
786 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
787
788 let client: Arc<FileClient<Block>> =
790 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
791
792 insert_headers(&factory, &headers);
794
795 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
796 client.clone(),
797 Arc::new(TestConsensus::default()),
798 factory,
799 );
800 downloader.set_download_range(0..=19).expect("failed to set download range");
801
802 assert_matches!(
803 downloader.next().await,
804 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
805 );
806 }
807
808 #[tokio::test]
809 async fn test_chunk_download_headers_from_file() {
810 reth_tracing::init_test_tracing();
811
812 let (file, headers, _) = generate_bodies_file(0..=14).await;
814
815 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
818 trace!(target: "downloaders::file::test", chunk_byte_len);
819
820 let mut reader =
822 ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
823
824 let mut downloaded_headers: Vec<SealedHeader> = vec![];
825
826 let mut local_header = headers.first().unwrap().clone();
827
828 while let Some(client) =
830 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
831 {
832 let sync_target = client.tip_header().unwrap();
833
834 let sync_target_hash = sync_target.hash();
835
836 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
838 .build(Arc::new(client), Arc::new(TestConsensus::default()));
839 header_downloader.update_local_head(local_header.clone());
840 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
841
842 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
844
845 local_header = sync_target;
847
848 downloaded_headers_chunk.reverse();
850 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
851 }
852
853 assert_eq!(headers[1..], downloaded_headers);
855 }
856
857 #[tokio::test]
858 async fn test_chunk_download_headers_from_gzip_file() {
859 reth_tracing::init_test_tracing();
860
861 let (file, headers, _) = generate_bodies_file(0..=14).await;
863
864 let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
866 let gzip_path = gzip_temp_file.path().to_owned();
867 drop(gzip_temp_file); let mut original_file = file;
871 original_file.seek(SeekFrom::Start(0)).await.unwrap();
872 let mut original_content = Vec::new();
873 original_file.read_to_end(&mut original_content).await.unwrap();
874
875 let mut gzip_file = File::create(&gzip_path).await.unwrap();
876 let mut encoder = GzipEncoder::new(&mut gzip_file);
877
878 encoder.write_all(&original_content).await.unwrap();
880 encoder.shutdown().await.unwrap();
881 drop(gzip_file);
882
883 let gzip_file = File::open(&gzip_path).await.unwrap();
885
886 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
889 trace!(target: "downloaders::file::test", chunk_byte_len);
890
891 let mut reader =
893 ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
894
895 let mut downloaded_headers: Vec<SealedHeader> = vec![];
896
897 let mut local_header = headers.first().unwrap().clone();
898
899 while let Some(client) =
901 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
902 {
903 if client.headers_len() == 0 {
904 continue;
905 }
906
907 let sync_target = client.tip_header().expect("tip_header should not be None");
908
909 let sync_target_hash = sync_target.hash();
910
911 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
913 .build(Arc::new(client), Arc::new(TestConsensus::default()));
914 header_downloader.update_local_head(local_header.clone());
915 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
916
917 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
919
920 local_header = sync_target;
922
923 downloaded_headers_chunk.reverse();
925 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
926 }
927
928 assert_eq!(headers[1..], downloaded_headers);
930 }
931}