1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use async_compression::tokio::bufread::GzipDecoder;
5use futures::Future;
6use itertools::{Either, Itertools};
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::RequestError,
12 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13 priority::Priority,
14 BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
18use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
19use thiserror::Error;
20use tokio::{
21 fs::File,
22 io::{AsyncReadExt, BufReader},
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::FramedRead;
26use tracing::{debug, trace, warn};
27
28use super::file_codec::BlockFileCodec;
29use crate::receipt_file_client::FromReceiptReader;
30
31pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
35
36#[derive(Debug, Clone)]
48pub struct FileClient<B: Block> {
49 headers: HashMap<BlockNumber, B::Header>,
51
52 hash_to_number: HashMap<BlockHash, BlockNumber>,
54
55 bodies: HashMap<BlockHash, B::Body>,
57}
58
59#[derive(Debug, Error)]
61pub enum FileClientError {
62 #[error(transparent)]
64 Consensus(#[from] ConsensusError),
65
66 #[error(transparent)]
68 Io(#[from] std::io::Error),
69
70 #[error("{0}")]
72 Rlp(alloy_rlp::Error, Vec<u8>),
73
74 #[error("{0}")]
76 Custom(&'static str),
77}
78
79impl From<&'static str> for FileClientError {
80 fn from(value: &'static str) -> Self {
81 Self::Custom(value)
82 }
83}
84
85impl<B: FullBlock> FileClient<B> {
86 pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
88 let blocks: Vec<_> = blocks.into_iter().collect();
89 let capacity = blocks.len();
90
91 let mut headers = HashMap::with_capacity(capacity);
92 let mut hash_to_number = HashMap::with_capacity(capacity);
93 let mut bodies = HashMap::with_capacity(capacity);
94
95 for block in blocks {
96 let number = block.number();
97 let hash = block.hash();
98 let (header, body) = block.split_sealed_header_body();
99
100 headers.insert(number, header.into_header());
101 hash_to_number.insert(hash, number);
102 bodies.insert(hash, body);
103 }
104
105 Self { headers, hash_to_number, bodies }
106 }
107
108 pub async fn new<P: AsRef<Path>>(
110 path: P,
111 consensus: Arc<dyn Consensus<B>>,
112 ) -> Result<Self, FileClientError> {
113 let file = File::open(path).await?;
114 Self::from_file(file, consensus).await
115 }
116
117 pub(crate) async fn from_file(
119 mut file: File,
120 consensus: Arc<dyn Consensus<B>>,
121 ) -> Result<Self, FileClientError> {
122 let metadata = file.metadata().await?;
124 let file_len = metadata.len();
125
126 let mut reader = vec![];
127 file.read_to_end(&mut reader).await?;
128
129 Ok(FileClientBuilder { consensus, parent_header: None, skip_invalid_blocks: false }
130 .build(&reader[..], file_len)
131 .await?
132 .file_client)
133 }
134
135 pub fn tip(&self) -> Option<B256> {
137 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
138 }
139
140 pub fn start(&self) -> Option<B256> {
142 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
143 }
144
145 pub fn max_block(&self) -> Option<u64> {
147 self.headers.keys().max().copied()
148 }
149
150 pub fn min_block(&self) -> Option<u64> {
152 self.headers.keys().min().copied()
153 }
154
155 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
158 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
159 }
160
161 pub fn has_canonical_blocks(&self) -> bool {
163 if self.headers.is_empty() {
164 return true
165 }
166 let (min, max) = self.headers.keys().minmax().into_option().expect("not empty");
167 *max - *min + 1 == self.headers.len() as u64
169 }
170
171 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
173 self.bodies = bodies;
174 self
175 }
176
177 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
179 self.headers = headers;
180 for (number, header) in &self.headers {
181 self.hash_to_number.insert(header.hash_slow(), *number);
182 }
183 self
184 }
185
186 pub fn headers_len(&self) -> usize {
188 self.headers.len()
189 }
190
191 pub fn bodies_len(&self) -> usize {
193 self.bodies.len()
194 }
195
196 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
198 self.headers.values()
199 }
200
201 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
205 let bodies = &mut self.bodies;
206 let numbers = &self.hash_to_number;
207 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
208 }
209
210 pub fn total_transactions(&self) -> usize {
212 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
213 }
214}
215
216struct FileClientBuilder<B: Block> {
217 pub consensus: Arc<dyn Consensus<B>>,
218 pub parent_header: Option<SealedHeader<B::Header>>,
219 pub skip_invalid_blocks: bool,
220}
221
222impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
223 for FileClientBuilder<B>
224{
225 type Error = FileClientError;
226 type Output = FileClient<B>;
227
228 fn build<R>(
230 &self,
231 reader: R,
232 num_bytes: u64,
233 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
234 where
235 R: AsyncReadExt + Unpin,
236 {
237 let mut headers = HashMap::default();
238 let mut hash_to_number = HashMap::default();
239 let mut bodies = HashMap::default();
240
241 let mut stream =
243 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
244
245 trace!(target: "downloaders::file",
246 target_num_bytes=num_bytes,
247 capacity=stream.read_buffer().capacity(),
248 "init decode stream"
249 );
250
251 let mut remaining_bytes = vec![];
252
253 let mut log_interval = 0;
254 let mut log_interval_start_block = 0;
255
256 let mut parent_header = self.parent_header.clone();
257
258 async move {
259 while let Some(block_res) = stream.next().await {
260 let block = match block_res {
261 Ok(block) => block,
262 Err(FileClientError::Rlp(err, bytes)) => {
263 trace!(target: "downloaders::file",
264 %err,
265 bytes_len=bytes.len(),
266 "partial block returned from decoding chunk"
267 );
268 remaining_bytes = bytes;
269 break
270 }
271 Err(err) => return Err(err),
272 };
273
274 let block = SealedBlock::seal_slow(block);
275
276 let validation =
281 self.consensus.validate_header(block.sealed_header()).and_then(|_| {
282 if let Some(parent) = &parent_header {
283 self.consensus
284 .validate_header_against_parent(block.sealed_header(), parent)?;
285 }
286 self.consensus.validate_block_pre_execution(&block)
287 });
288 if let Err(err) = validation {
289 if !self.skip_invalid_blocks {
290 return Err(err.into())
291 }
292 warn!(target: "downloaders::file",
293 block_number = block.number(),
294 block_hash = %block.hash(),
295 %err,
296 "skipping invalid block while decoding file"
297 );
298 continue
299 }
300 if parent_header.is_some() {
301 parent_header = Some(block.sealed_header().clone());
302 }
303
304 let block_hash = block.hash();
306 let block_number = block.number();
307 let (header, body) = block.split_sealed_header_body();
308 headers.insert(block_number, header.unseal());
309 hash_to_number.insert(block_hash, block_number);
310 bodies.insert(block_hash, body);
311
312 if log_interval == 0 {
313 trace!(target: "downloaders::file",
314 block_number,
315 "read first block"
316 );
317 log_interval_start_block = block_number;
318 } else if log_interval % 100_000 == 0 {
319 trace!(target: "downloaders::file",
320 blocks=?log_interval_start_block..=block_number,
321 "read blocks from file"
322 );
323 log_interval_start_block = block_number + 1;
324 }
325 log_interval += 1;
326 }
327
328 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
329
330 Ok(DecodedFileChunk {
331 file_client: FileClient { headers, hash_to_number, bodies },
332 remaining_bytes,
333 highest_block: None,
334 })
335 }
336 }
337}
338
339impl<B: FullBlock> HeadersClient for FileClient<B> {
340 type Header = B::Header;
341 type Output = HeadersFut<B::Header>;
342
343 fn get_headers_with_priority(
344 &self,
345 request: HeadersRequest,
346 _priority: Priority,
347 ) -> Self::Output {
348 let mut headers = Vec::new();
350 trace!(target: "downloaders::file", request=?request, "Getting headers");
351
352 let start_num = match request.start {
353 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
354 Some(num) => *num,
355 None => {
356 warn!(%hash, "Could not find starting block number for requested header hash");
357 return Box::pin(async move { Err(RequestError::BadResponse) })
358 }
359 },
360 BlockHashOrNumber::Number(num) => num,
361 };
362
363 let range = if request.limit == 1 {
364 Either::Left(start_num..start_num + 1)
365 } else {
366 match request.direction {
367 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
368 HeadersDirection::Falling => {
369 Either::Right((start_num - request.limit + 1..=start_num).rev())
370 }
371 }
372 };
373
374 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
375
376 for block_number in range {
377 match self.headers.get(&block_number).cloned() {
378 Some(header) => headers.push(header),
379 None => {
380 warn!(number=%block_number, "Could not find header");
381 return Box::pin(async move { Err(RequestError::BadResponse) })
382 }
383 }
384 }
385
386 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
387 }
388}
389
390impl<B: FullBlock> BodiesClient for FileClient<B> {
391 type Body = B::Body;
392 type Output = BodiesFut<B::Body>;
393
394 fn get_block_bodies_with_priority_and_range_hint(
395 &self,
396 hashes: Vec<B256>,
397 _priority: Priority,
398 _range_hint: Option<RangeInclusive<u64>>,
399 ) -> Self::Output {
400 let mut bodies = Vec::new();
402
403 for hash in hashes {
406 match self.bodies.get(&hash).cloned() {
407 Some(body) => bodies.push(body),
408 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
409 }
410 }
411
412 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
413 }
414}
415
416impl<B: FullBlock> DownloadClient for FileClient<B> {
417 fn report_bad_message(&self, _peer_id: PeerId) {
418 trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
419 }
421
422 fn num_connected_peers(&self) -> usize {
423 1
425 }
426}
427
428impl<B: FullBlock> BlockClient for FileClient<B> {
429 type Block = B;
430}
431
432#[derive(Debug)]
434enum FileReader {
435 Plain { file: File, remaining_bytes: u64 },
437 Gzip(GzipDecoder<BufReader<File>>),
439}
440
441impl FileReader {
442 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
444 match self {
445 Self::Plain { file, .. } => file.read(buf).await,
446 Self::Gzip(decoder) => decoder.read(buf).await,
447 }
448 }
449
450 async fn read_next_chunk(
453 &mut self,
454 chunk: &mut Vec<u8>,
455 chunk_byte_len: u64,
456 ) -> Result<Option<u64>, FileClientError> {
457 match self {
458 Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
459 Self::Gzip(_) => {
460 Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
461 .then_some(chunk.len() as u64))
462 }
463 }
464 }
465
466 async fn read_plain_chunk(
467 &mut self,
468 chunk: &mut Vec<u8>,
469 chunk_byte_len: u64,
470 ) -> Result<Option<u64>, FileClientError> {
471 let Self::Plain { file, remaining_bytes } = self else {
472 unreachable!("read_plain_chunk should only be called on Plain variant")
473 };
474
475 if *remaining_bytes == 0 && chunk.is_empty() {
476 return Ok(None)
478 }
479
480 let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
481 let old_bytes_len = chunk.len() as u64;
482
483 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
485
486 let prev_read_bytes_len = chunk.len();
488 chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
489 let reader = &mut chunk[prev_read_bytes_len..];
490
491 let new_read_bytes_len = file.read_exact(reader).await? as u64;
493 let next_chunk_byte_len = chunk.len();
494
495 *remaining_bytes -= new_read_bytes_len;
497
498 debug!(target: "downloaders::file",
499 max_chunk_byte_len=chunk_byte_len,
500 prev_read_bytes_len,
501 new_read_bytes_target_len,
502 new_read_bytes_len,
503 next_chunk_byte_len,
504 remaining_file_byte_len=*remaining_bytes,
505 "new bytes were read from file"
506 );
507
508 Ok(Some(next_chunk_byte_len as u64))
509 }
510
511 async fn read_gzip_chunk(
513 &mut self,
514 chunk: &mut Vec<u8>,
515 chunk_byte_len: u64,
516 ) -> Result<bool, FileClientError> {
517 let mut buffer = vec![0u8; 64 * 1024];
518 loop {
519 if chunk.len() >= chunk_byte_len as usize {
520 return Ok(true)
521 }
522
523 match self.read(&mut buffer).await {
524 Ok(0) => return Ok(!chunk.is_empty()),
525 Ok(n) => {
526 chunk.extend_from_slice(&buffer[..n]);
527 }
528 Err(e) => return Err(e.into()),
529 }
530 }
531 }
532}
533
534#[derive(Debug)]
536pub struct ChunkedFileReader {
537 file: FileReader,
539 chunk: Vec<u8>,
541 chunk_byte_len: u64,
543 highest_block: Option<u64>,
546}
547
548impl ChunkedFileReader {
549 pub async fn new<P: AsRef<Path>>(
553 path: P,
554 chunk_byte_len: Option<u64>,
555 ) -> Result<Self, FileClientError> {
556 let path = path.as_ref();
557 let file = File::open(path).await?;
558 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
559
560 Self::from_file(
561 file,
562 chunk_byte_len,
563 path.extension()
564 .and_then(|ext| ext.to_str())
565 .is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
566 )
567 .await
568 }
569
570 pub async fn from_file(
572 file: File,
573 chunk_byte_len: u64,
574 is_gzip: bool,
575 ) -> Result<Self, FileClientError> {
576 let file_reader = if is_gzip {
577 FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
578 } else {
579 let remaining_bytes = file.metadata().await?.len();
580 FileReader::Plain { file, remaining_bytes }
581 };
582
583 Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
584 }
585
586 async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
589 self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
590 }
591
592 pub async fn next_chunk<B: FullBlock>(
597 &mut self,
598 consensus: Arc<dyn Consensus<B>>,
599 parent_header: Option<SealedHeader<B::Header>>,
600 ) -> Result<Option<FileClient<B>>, FileClientError> {
601 self.next_chunk_with_invalid_block_handling(consensus, parent_header, false).await
602 }
603
604 pub async fn next_chunk_with_invalid_block_handling<B: FullBlock>(
606 &mut self,
607 consensus: Arc<dyn Consensus<B>>,
608 parent_header: Option<SealedHeader<B::Header>>,
609 skip_invalid_blocks: bool,
610 ) -> Result<Option<FileClient<B>>, FileClientError> {
611 let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
612
613 let DecodedFileChunk { file_client, remaining_bytes, .. } =
615 FileClientBuilder { consensus, parent_header, skip_invalid_blocks }
616 .build(&self.chunk[..], chunk_len)
617 .await?;
618
619 self.chunk = remaining_bytes;
621
622 Ok(Some(file_client))
623 }
624
625 pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
627 where
628 T: FromReceiptReader,
629 {
630 let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
631 T::Error::from(match e {
632 FileClientError::Io(io_err) => io_err,
633 _ => io::Error::other(e.to_string()),
634 })
635 })?
636 else {
637 return Ok(None)
638 };
639
640 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
642 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
643 .await?;
644
645 self.chunk = remaining_bytes;
647 self.highest_block = highest_block;
649
650 Ok(Some(file_client))
651 }
652}
653
654pub trait FromReader {
656 type Error: From<io::Error>;
658
659 type Output;
661
662 fn build<R>(
664 &self,
665 reader: R,
666 num_bytes: u64,
667 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
668 where
669 Self: Sized,
670 R: AsyncReadExt + Unpin;
671}
672
673#[derive(Debug)]
675pub struct DecodedFileChunk<T> {
676 pub file_client: T,
678 pub remaining_bytes: Vec<u8>,
680 pub highest_block: Option<u64>,
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use crate::{
689 bodies::{
690 bodies::BodiesDownloaderBuilder,
691 test_utils::{insert_headers, zip_blocks},
692 },
693 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
694 test_utils::{generate_bodies, generate_bodies_file},
695 };
696 use assert_matches::assert_matches;
697 use async_compression::tokio::write::GzipEncoder;
698 use futures_util::stream::StreamExt;
699 use rand::Rng;
700 use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus, ConsensusError};
701 use reth_ethereum_primitives::Block;
702 use reth_network_p2p::{
703 bodies::downloader::BodyDownloader,
704 headers::downloader::{HeaderDownloader, SyncTarget},
705 };
706 use reth_provider::test_utils::create_test_provider_factory;
707 use std::sync::Arc;
708 use tokio::{
709 fs::File,
710 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
711 };
712
713 #[tokio::test]
714 async fn streams_bodies_from_buffer() {
715 let factory = create_test_provider_factory();
717 let (headers, mut bodies) = generate_bodies(0..=19);
718
719 insert_headers(&factory, &headers);
720
721 let file = tempfile::tempfile().unwrap();
723
724 let client: Arc<FileClient<Block>> = Arc::new(
725 FileClient::from_file(file.into(), NoopConsensus::arc())
726 .await
727 .unwrap()
728 .with_bodies(bodies.clone().into_iter().collect()),
729 );
730 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
731 client.clone(),
732 Arc::new(TestConsensus::default()),
733 factory,
734 );
735 downloader.set_download_range(0..=19).expect("failed to set download range");
736
737 assert_matches!(
738 downloader.next().await,
739 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
740 );
741 }
742
743 #[tokio::test]
744 async fn download_headers_at_fork_head() {
745 reth_tracing::init_test_tracing();
746
747 let p3 = SealedHeader::default();
748 let p2 = child_header(&p3);
749 let p1 = child_header(&p2);
750 let p0 = child_header(&p1);
751
752 let file = tempfile::tempfile().unwrap();
753 let client: Arc<FileClient<Block>> = Arc::new(
754 FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
755 HashMap::from([
756 (0u64, p0.clone_header()),
757 (1, p1.clone_header()),
758 (2, p2.clone_header()),
759 (3, p3.clone_header()),
760 ]),
761 ),
762 );
763
764 let mut downloader = ReverseHeadersDownloaderBuilder::default()
765 .stream_batch_size(3)
766 .request_limit(3)
767 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
768 downloader.update_local_head(p3.clone());
769 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
770
771 let headers = downloader.next().await.unwrap();
772 assert_eq!(headers.unwrap(), vec![p0, p1, p2]);
773 assert!(downloader.next().await.is_none());
774 assert!(downloader.next().await.is_none());
775 }
776
777 #[tokio::test]
778 async fn test_download_headers_from_file() {
779 reth_tracing::init_test_tracing();
780
781 let (file, headers, _) = generate_bodies_file(0..=19).await;
783 let client: Arc<FileClient<Block>> =
785 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
786
787 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
789 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
790 header_downloader.update_local_head(headers.first().unwrap().clone());
791 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
792
793 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
795
796 downloaded_headers.reverse();
798
799 assert_eq!(downloaded_headers, headers[1..]);
801 }
802
803 #[tokio::test]
804 async fn test_download_bodies_from_file() {
805 let factory = create_test_provider_factory();
807 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
808
809 let client: Arc<FileClient<Block>> =
811 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
812
813 insert_headers(&factory, &headers);
815
816 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
817 client.clone(),
818 Arc::new(TestConsensus::default()),
819 factory,
820 );
821 downloader.set_download_range(0..=19).expect("failed to set download range");
822
823 assert_matches!(
824 downloader.next().await,
825 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
826 );
827 }
828
829 #[tokio::test]
830 async fn strict_chunk_decode_fails_on_invalid_block() {
831 let (file, _, _) = generate_bodies_file(0..=2).await;
832 let chunk_byte_len = file.metadata().await.unwrap().len();
833 let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len, false).await.unwrap();
834 let consensus = Arc::new(TestConsensus::default());
835 consensus.set_fail_validation(true);
836
837 let err = reader.next_chunk::<Block>(consensus, None).await.unwrap_err();
838
839 assert_matches!(err, FileClientError::Consensus(ConsensusError::BaseFeeMissing));
840 }
841
842 #[tokio::test]
843 async fn lenient_chunk_decode_skips_invalid_blocks() {
844 let (file, _, _) = generate_bodies_file(0..=2).await;
845 let chunk_byte_len = file.metadata().await.unwrap().len();
846 let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len, false).await.unwrap();
847 let consensus = Arc::new(TestConsensus::default());
848 consensus.set_fail_validation(true);
849
850 let client = reader
851 .next_chunk_with_invalid_block_handling::<Block>(consensus, None, true)
852 .await
853 .unwrap()
854 .unwrap();
855
856 assert_eq!(client.headers_len(), 0);
857 assert!(client.tip().is_none());
858 }
859
860 #[tokio::test]
861 async fn test_chunk_download_headers_from_file() {
862 reth_tracing::init_test_tracing();
863
864 let (file, headers, _) = generate_bodies_file(0..=14).await;
866
867 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
870 trace!(target: "downloaders::file::test", chunk_byte_len);
871
872 let mut reader =
874 ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
875
876 let mut downloaded_headers: Vec<SealedHeader> = vec![];
877
878 let mut local_header = headers.first().unwrap().clone();
879
880 while let Some(client) =
882 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
883 {
884 let sync_target = client.tip_header().unwrap();
885
886 let sync_target_hash = sync_target.hash();
887
888 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
890 .build(Arc::new(client), Arc::new(TestConsensus::default()));
891 header_downloader.update_local_head(local_header.clone());
892 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
893
894 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
896
897 local_header = sync_target;
899
900 downloaded_headers_chunk.reverse();
902 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
903 }
904
905 assert_eq!(headers[1..], downloaded_headers);
907 }
908
909 #[tokio::test]
910 async fn test_chunk_download_headers_from_gzip_file() {
911 reth_tracing::init_test_tracing();
912
913 let (file, headers, _) = generate_bodies_file(0..=14).await;
915
916 let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
918 let gzip_path = gzip_temp_file.path().to_owned();
919 drop(gzip_temp_file); let mut original_file = file;
923 original_file.seek(SeekFrom::Start(0)).await.unwrap();
924 let mut original_content = Vec::new();
925 original_file.read_to_end(&mut original_content).await.unwrap();
926
927 let mut gzip_file = File::create(&gzip_path).await.unwrap();
928 let mut encoder = GzipEncoder::new(&mut gzip_file);
929
930 encoder.write_all(&original_content).await.unwrap();
932 encoder.shutdown().await.unwrap();
933 drop(gzip_file);
934
935 let gzip_file = File::open(&gzip_path).await.unwrap();
937
938 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
941 trace!(target: "downloaders::file::test", chunk_byte_len);
942
943 let mut reader =
945 ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
946
947 let mut downloaded_headers: Vec<SealedHeader> = vec![];
948
949 let mut local_header = headers.first().unwrap().clone();
950
951 while let Some(client) =
953 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
954 {
955 if client.headers_len() == 0 {
956 continue;
957 }
958
959 let sync_target = client.tip_header().expect("tip_header should not be None");
960
961 let sync_target_hash = sync_target.hash();
962
963 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
965 .build(Arc::new(client), Arc::new(TestConsensus::default()));
966 header_downloader.update_local_head(local_header.clone());
967 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
968
969 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
971
972 local_header = sync_target;
974
975 downloaded_headers_chunk.reverse();
977 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
978 }
979
980 assert_eq!(headers[1..], downloaded_headers);
982 }
983}