Skip to main content

reth_downloaders/
file_client.rs

1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use async_compression::tokio::bufread::GzipDecoder;
5use futures::Future;
6use itertools::{Either, Itertools};
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9    bodies::client::{BodiesClient, BodiesFut},
10    download::DownloadClient,
11    error::RequestError,
12    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13    priority::Priority,
14    BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
18use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
19use thiserror::Error;
20use tokio::{
21    fs::File,
22    io::{AsyncReadExt, BufReader},
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::FramedRead;
26use tracing::{debug, trace, warn};
27
28use super::file_codec::BlockFileCodec;
29use crate::receipt_file_client::FromReceiptReader;
30
31/// Default byte length of chunk to read from chain file.
32///
33/// Default is 1 GB.
34pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
35
36/// Front-end API for fetching chain data from a file.
37///
38/// Blocks are assumed to be written one after another in a file, as rlp bytes.
39///
40/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
41/// rlp(block1) || rlp(block2) || rlp(block3)
42///
43/// Blocks are assumed to have populated transactions, so reading headers will also buffer
44/// transactions in memory for use in the bodies stage.
45///
46/// This reads the entire file into memory, so it is not suitable for large files.
47#[derive(Debug, Clone)]
48pub struct FileClient<B: Block> {
49    /// The buffered headers retrieved when fetching new bodies.
50    headers: HashMap<BlockNumber, B::Header>,
51
52    /// A mapping between block hash and number.
53    hash_to_number: HashMap<BlockHash, BlockNumber>,
54
55    /// The buffered bodies retrieved when fetching new headers.
56    bodies: HashMap<BlockHash, B::Body>,
57}
58
59/// An error that can occur when constructing and using a [`FileClient`].
60#[derive(Debug, Error)]
61pub enum FileClientError {
62    /// An error occurred when validating a header from file.
63    #[error(transparent)]
64    Consensus(#[from] ConsensusError),
65
66    /// An error occurred when opening or reading the file.
67    #[error(transparent)]
68    Io(#[from] std::io::Error),
69
70    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
71    #[error("{0}")]
72    Rlp(alloy_rlp::Error, Vec<u8>),
73
74    /// Custom error message.
75    #[error("{0}")]
76    Custom(&'static str),
77}
78
79impl From<&'static str> for FileClientError {
80    fn from(value: &'static str) -> Self {
81        Self::Custom(value)
82    }
83}
84
85impl<B: FullBlock> FileClient<B> {
86    /// Create a new file client from a slice of sealed blocks.
87    pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
88        let blocks: Vec<_> = blocks.into_iter().collect();
89        let capacity = blocks.len();
90
91        let mut headers = HashMap::with_capacity(capacity);
92        let mut hash_to_number = HashMap::with_capacity(capacity);
93        let mut bodies = HashMap::with_capacity(capacity);
94
95        for block in blocks {
96            let number = block.number();
97            let hash = block.hash();
98            let (header, body) = block.split_sealed_header_body();
99
100            headers.insert(number, header.into_header());
101            hash_to_number.insert(hash, number);
102            bodies.insert(hash, body);
103        }
104
105        Self { headers, hash_to_number, bodies }
106    }
107
108    /// Create a new file client from a file path.
109    pub async fn new<P: AsRef<Path>>(
110        path: P,
111        consensus: Arc<dyn Consensus<B>>,
112    ) -> Result<Self, FileClientError> {
113        let file = File::open(path).await?;
114        Self::from_file(file, consensus).await
115    }
116
117    /// Initialize the [`FileClient`] with a file directly.
118    pub(crate) async fn from_file(
119        mut file: File,
120        consensus: Arc<dyn Consensus<B>>,
121    ) -> Result<Self, FileClientError> {
122        // get file len from metadata before reading
123        let metadata = file.metadata().await?;
124        let file_len = metadata.len();
125
126        let mut reader = vec![];
127        file.read_to_end(&mut reader).await?;
128
129        Ok(FileClientBuilder { consensus, parent_header: None, skip_invalid_blocks: false }
130            .build(&reader[..], file_len)
131            .await?
132            .file_client)
133    }
134
135    /// Get the tip hash of the chain.
136    pub fn tip(&self) -> Option<B256> {
137        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
138    }
139
140    /// Get the start hash of the chain.
141    pub fn start(&self) -> Option<B256> {
142        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
143    }
144
145    /// Returns the highest block number of this client has or `None` if empty
146    pub fn max_block(&self) -> Option<u64> {
147        self.headers.keys().max().copied()
148    }
149
150    /// Returns the lowest block number of this client has or `None` if empty
151    pub fn min_block(&self) -> Option<u64> {
152        self.headers.keys().min().copied()
153    }
154
155    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
156    /// before returning.
157    pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
158        self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
159    }
160
161    /// Returns true if all blocks are canonical (no gaps)
162    pub fn has_canonical_blocks(&self) -> bool {
163        if self.headers.is_empty() {
164            return true
165        }
166        let (min, max) = self.headers.keys().minmax().into_option().expect("not empty");
167        // Contiguous range from min to max means no gaps
168        *max - *min + 1 == self.headers.len() as u64
169    }
170
171    /// Use the provided bodies as the file client's block body buffer.
172    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
173        self.bodies = bodies;
174        self
175    }
176
177    /// Use the provided headers as the file client's block body buffer.
178    pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
179        self.headers = headers;
180        for (number, header) in &self.headers {
181            self.hash_to_number.insert(header.hash_slow(), *number);
182        }
183        self
184    }
185
186    /// Returns the current number of headers in the client.
187    pub fn headers_len(&self) -> usize {
188        self.headers.len()
189    }
190
191    /// Returns the current number of bodies in the client.
192    pub fn bodies_len(&self) -> usize {
193        self.bodies.len()
194    }
195
196    /// Returns an iterator over headers in the client.
197    pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
198        self.headers.values()
199    }
200
201    /// Returns a mutable iterator over bodies in the client.
202    ///
203    /// Panics, if file client headers and bodies are not mapping 1-1.
204    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
205        let bodies = &mut self.bodies;
206        let numbers = &self.hash_to_number;
207        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
208    }
209
210    /// Returns the current number of transactions in the client.
211    pub fn total_transactions(&self) -> usize {
212        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
213    }
214}
215
216struct FileClientBuilder<B: Block> {
217    pub consensus: Arc<dyn Consensus<B>>,
218    pub parent_header: Option<SealedHeader<B::Header>>,
219    pub skip_invalid_blocks: bool,
220}
221
222impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
223    for FileClientBuilder<B>
224{
225    type Error = FileClientError;
226    type Output = FileClient<B>;
227
228    /// Initialize the [`FileClient`] from bytes that have been read from file.
229    fn build<R>(
230        &self,
231        reader: R,
232        num_bytes: u64,
233    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
234    where
235        R: AsyncReadExt + Unpin,
236    {
237        let mut headers = HashMap::default();
238        let mut hash_to_number = HashMap::default();
239        let mut bodies = HashMap::default();
240
241        // use with_capacity to make sure the internal buffer contains the entire chunk
242        let mut stream =
243            FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
244
245        trace!(target: "downloaders::file",
246            target_num_bytes=num_bytes,
247            capacity=stream.read_buffer().capacity(),
248            "init decode stream"
249        );
250
251        let mut remaining_bytes = vec![];
252
253        let mut log_interval = 0;
254        let mut log_interval_start_block = 0;
255
256        let mut parent_header = self.parent_header.clone();
257
258        async move {
259            while let Some(block_res) = stream.next().await {
260                let block = match block_res {
261                    Ok(block) => block,
262                    Err(FileClientError::Rlp(err, bytes)) => {
263                        trace!(target: "downloaders::file",
264                            %err,
265                            bytes_len=bytes.len(),
266                            "partial block returned from decoding chunk"
267                        );
268                        remaining_bytes = bytes;
269                        break
270                    }
271                    Err(err) => return Err(err),
272                };
273
274                let block = SealedBlock::seal_slow(block);
275
276                // Run consensus pre-checks. An invalid block here (e.g. mid-file in a
277                // BlockchainTest sequence that intentionally interleaves invalid block proposals
278                // with the valid chain) is not a hard failure: skip the block and keep decoding
279                // so the pipeline can still apply the valid prefix.
280                let validation =
281                    self.consensus.validate_header(block.sealed_header()).and_then(|_| {
282                        if let Some(parent) = &parent_header {
283                            self.consensus
284                                .validate_header_against_parent(block.sealed_header(), parent)?;
285                        }
286                        self.consensus.validate_block_pre_execution(&block)
287                    });
288                if let Err(err) = validation {
289                    if !self.skip_invalid_blocks {
290                        return Err(err.into())
291                    }
292                    warn!(target: "downloaders::file",
293                        block_number = block.number(),
294                        block_hash = %block.hash(),
295                        %err,
296                        "skipping invalid block while decoding file"
297                    );
298                    continue
299                }
300                if parent_header.is_some() {
301                    parent_header = Some(block.sealed_header().clone());
302                }
303
304                // add to the internal maps
305                let block_hash = block.hash();
306                let block_number = block.number();
307                let (header, body) = block.split_sealed_header_body();
308                headers.insert(block_number, header.unseal());
309                hash_to_number.insert(block_hash, block_number);
310                bodies.insert(block_hash, body);
311
312                if log_interval == 0 {
313                    trace!(target: "downloaders::file",
314                        block_number,
315                        "read first block"
316                    );
317                    log_interval_start_block = block_number;
318                } else if log_interval % 100_000 == 0 {
319                    trace!(target: "downloaders::file",
320                        blocks=?log_interval_start_block..=block_number,
321                        "read blocks from file"
322                    );
323                    log_interval_start_block = block_number + 1;
324                }
325                log_interval += 1;
326            }
327
328            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
329
330            Ok(DecodedFileChunk {
331                file_client: FileClient { headers, hash_to_number, bodies },
332                remaining_bytes,
333                highest_block: None,
334            })
335        }
336    }
337}
338
339impl<B: FullBlock> HeadersClient for FileClient<B> {
340    type Header = B::Header;
341    type Output = HeadersFut<B::Header>;
342
343    fn get_headers_with_priority(
344        &self,
345        request: HeadersRequest,
346        _priority: Priority,
347    ) -> Self::Output {
348        // this just searches the buffer, and fails if it can't find the header
349        let mut headers = Vec::new();
350        trace!(target: "downloaders::file", request=?request, "Getting headers");
351
352        let start_num = match request.start {
353            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
354                Some(num) => *num,
355                None => {
356                    warn!(%hash, "Could not find starting block number for requested header hash");
357                    return Box::pin(async move { Err(RequestError::BadResponse) })
358                }
359            },
360            BlockHashOrNumber::Number(num) => num,
361        };
362
363        let range = if request.limit == 1 {
364            Either::Left(start_num..start_num + 1)
365        } else {
366            match request.direction {
367                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
368                HeadersDirection::Falling => {
369                    Either::Right((start_num - request.limit + 1..=start_num).rev())
370                }
371            }
372        };
373
374        trace!(target: "downloaders::file", range=?range, "Getting headers with range");
375
376        for block_number in range {
377            match self.headers.get(&block_number).cloned() {
378                Some(header) => headers.push(header),
379                None => {
380                    warn!(number=%block_number, "Could not find header");
381                    return Box::pin(async move { Err(RequestError::BadResponse) })
382                }
383            }
384        }
385
386        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
387    }
388}
389
390impl<B: FullBlock> BodiesClient for FileClient<B> {
391    type Body = B::Body;
392    type Output = BodiesFut<B::Body>;
393
394    fn get_block_bodies_with_priority_and_range_hint(
395        &self,
396        hashes: Vec<B256>,
397        _priority: Priority,
398        _range_hint: Option<RangeInclusive<u64>>,
399    ) -> Self::Output {
400        // this just searches the buffer, and fails if it can't find the block
401        let mut bodies = Vec::new();
402
403        // check if any are an error
404        // could unwrap here
405        for hash in hashes {
406            match self.bodies.get(&hash).cloned() {
407                Some(body) => bodies.push(body),
408                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
409            }
410        }
411
412        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
413    }
414}
415
416impl<B: FullBlock> DownloadClient for FileClient<B> {
417    fn report_bad_message(&self, _peer_id: PeerId) {
418        trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
419        // noop
420    }
421
422    fn num_connected_peers(&self) -> usize {
423        // no such thing as connected peers when we are just using a file
424        1
425    }
426}
427
428impl<B: FullBlock> BlockClient for FileClient<B> {
429    type Block = B;
430}
431
432/// File reader type for handling different compression formats.
433#[derive(Debug)]
434enum FileReader {
435    /// Regular uncompressed file with remaining byte tracking.
436    Plain { file: File, remaining_bytes: u64 },
437    /// Gzip compressed file.
438    Gzip(GzipDecoder<BufReader<File>>),
439}
440
441impl FileReader {
442    /// Read some data into the provided buffer, returning the number of bytes read.
443    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
444        match self {
445            Self::Plain { file, .. } => file.read(buf).await,
446            Self::Gzip(decoder) => decoder.read(buf).await,
447        }
448    }
449
450    /// Read next chunk from file. Returns the number of bytes read for plain files,
451    /// or a boolean indicating if data is available for gzip files.
452    async fn read_next_chunk(
453        &mut self,
454        chunk: &mut Vec<u8>,
455        chunk_byte_len: u64,
456    ) -> Result<Option<u64>, FileClientError> {
457        match self {
458            Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
459            Self::Gzip(_) => {
460                Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
461                    .then_some(chunk.len() as u64))
462            }
463        }
464    }
465
466    async fn read_plain_chunk(
467        &mut self,
468        chunk: &mut Vec<u8>,
469        chunk_byte_len: u64,
470    ) -> Result<Option<u64>, FileClientError> {
471        let Self::Plain { file, remaining_bytes } = self else {
472            unreachable!("read_plain_chunk should only be called on Plain variant")
473        };
474
475        if *remaining_bytes == 0 && chunk.is_empty() {
476            // eof
477            return Ok(None)
478        }
479
480        let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
481        let old_bytes_len = chunk.len() as u64;
482
483        // calculate reserved space in chunk
484        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
485
486        // read new bytes from file
487        let prev_read_bytes_len = chunk.len();
488        chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
489        let reader = &mut chunk[prev_read_bytes_len..];
490
491        // actual bytes that have been read
492        let new_read_bytes_len = file.read_exact(reader).await? as u64;
493        let next_chunk_byte_len = chunk.len();
494
495        // update remaining file length
496        *remaining_bytes -= new_read_bytes_len;
497
498        debug!(target: "downloaders::file",
499            max_chunk_byte_len=chunk_byte_len,
500            prev_read_bytes_len,
501            new_read_bytes_target_len,
502            new_read_bytes_len,
503            next_chunk_byte_len,
504            remaining_file_byte_len=*remaining_bytes,
505            "new bytes were read from file"
506        );
507
508        Ok(Some(next_chunk_byte_len as u64))
509    }
510
511    /// Read next chunk from gzipped file.
512    async fn read_gzip_chunk(
513        &mut self,
514        chunk: &mut Vec<u8>,
515        chunk_byte_len: u64,
516    ) -> Result<bool, FileClientError> {
517        let mut buffer = vec![0u8; 64 * 1024];
518        loop {
519            if chunk.len() >= chunk_byte_len as usize {
520                return Ok(true)
521            }
522
523            match self.read(&mut buffer).await {
524                Ok(0) => return Ok(!chunk.is_empty()),
525                Ok(n) => {
526                    chunk.extend_from_slice(&buffer[..n]);
527                }
528                Err(e) => return Err(e.into()),
529            }
530        }
531    }
532}
533
534/// Chunks file into several [`FileClient`]s.
535#[derive(Debug)]
536pub struct ChunkedFileReader {
537    /// File reader (either plain or gzip).
538    file: FileReader,
539    /// Bytes that have been read.
540    chunk: Vec<u8>,
541    /// Max bytes per chunk.
542    chunk_byte_len: u64,
543    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
544    /// with block number
545    highest_block: Option<u64>,
546}
547
548impl ChunkedFileReader {
549    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
550    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
551    /// Automatically detects gzip files by extension (.gz, .gzip).
552    pub async fn new<P: AsRef<Path>>(
553        path: P,
554        chunk_byte_len: Option<u64>,
555    ) -> Result<Self, FileClientError> {
556        let path = path.as_ref();
557        let file = File::open(path).await?;
558        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
559
560        Self::from_file(
561            file,
562            chunk_byte_len,
563            path.extension()
564                .and_then(|ext| ext.to_str())
565                .is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
566        )
567        .await
568    }
569
570    /// Opens the file to import from given path. Returns a new instance.
571    pub async fn from_file(
572        file: File,
573        chunk_byte_len: u64,
574        is_gzip: bool,
575    ) -> Result<Self, FileClientError> {
576        let file_reader = if is_gzip {
577            FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
578        } else {
579            let remaining_bytes = file.metadata().await?.len();
580            FileReader::Plain { file, remaining_bytes }
581        };
582
583        Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
584    }
585
586    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
587    /// chunk to read.
588    async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
589        self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
590    }
591
592    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
593    ///
594    /// For gzipped files, this method accumulates data until at least `chunk_byte_len` bytes
595    /// are available before processing. For plain files, it uses the original chunking logic.
596    pub async fn next_chunk<B: FullBlock>(
597        &mut self,
598        consensus: Arc<dyn Consensus<B>>,
599        parent_header: Option<SealedHeader<B::Header>>,
600    ) -> Result<Option<FileClient<B>>, FileClientError> {
601        self.next_chunk_with_invalid_block_handling(consensus, parent_header, false).await
602    }
603
604    /// Read next chunk from file, optionally skipping blocks that fail consensus pre-checks.
605    pub async fn next_chunk_with_invalid_block_handling<B: FullBlock>(
606        &mut self,
607        consensus: Arc<dyn Consensus<B>>,
608        parent_header: Option<SealedHeader<B::Header>>,
609        skip_invalid_blocks: bool,
610    ) -> Result<Option<FileClient<B>>, FileClientError> {
611        let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
612
613        // make new file client from chunk
614        let DecodedFileChunk { file_client, remaining_bytes, .. } =
615            FileClientBuilder { consensus, parent_header, skip_invalid_blocks }
616                .build(&self.chunk[..], chunk_len)
617                .await?;
618
619        // save left over bytes
620        self.chunk = remaining_bytes;
621
622        Ok(Some(file_client))
623    }
624
625    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
626    pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
627    where
628        T: FromReceiptReader,
629    {
630        let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
631            T::Error::from(match e {
632                FileClientError::Io(io_err) => io_err,
633                _ => io::Error::other(e.to_string()),
634            })
635        })?
636        else {
637            return Ok(None)
638        };
639
640        // make new file client from chunk
641        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
642            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
643                .await?;
644
645        // save left over bytes
646        self.chunk = remaining_bytes;
647        // update highest block
648        self.highest_block = highest_block;
649
650        Ok(Some(file_client))
651    }
652}
653
654/// Constructs a file client from a reader.
655pub trait FromReader {
656    /// Error returned by file client type.
657    type Error: From<io::Error>;
658
659    /// Output returned by file client type.
660    type Output;
661
662    /// Returns a file client
663    fn build<R>(
664        &self,
665        reader: R,
666        num_bytes: u64,
667    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
668    where
669        Self: Sized,
670        R: AsyncReadExt + Unpin;
671}
672
673/// Output from decoding a file chunk with [`FromReader::build`].
674#[derive(Debug)]
675pub struct DecodedFileChunk<T> {
676    /// File client, i.e. the decoded part of chunk.
677    pub file_client: T,
678    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
679    pub remaining_bytes: Vec<u8>,
680    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
681    /// block number, like receipts.
682    pub highest_block: Option<u64>,
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688    use crate::{
689        bodies::{
690            bodies::BodiesDownloaderBuilder,
691            test_utils::{insert_headers, zip_blocks},
692        },
693        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
694        test_utils::{generate_bodies, generate_bodies_file},
695    };
696    use assert_matches::assert_matches;
697    use async_compression::tokio::write::GzipEncoder;
698    use futures_util::stream::StreamExt;
699    use rand::Rng;
700    use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus, ConsensusError};
701    use reth_ethereum_primitives::Block;
702    use reth_network_p2p::{
703        bodies::downloader::BodyDownloader,
704        headers::downloader::{HeaderDownloader, SyncTarget},
705    };
706    use reth_provider::test_utils::create_test_provider_factory;
707    use std::sync::Arc;
708    use tokio::{
709        fs::File,
710        io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
711    };
712
713    #[tokio::test]
714    async fn streams_bodies_from_buffer() {
715        // Generate some random blocks
716        let factory = create_test_provider_factory();
717        let (headers, mut bodies) = generate_bodies(0..=19);
718
719        insert_headers(&factory, &headers);
720
721        // create an empty file
722        let file = tempfile::tempfile().unwrap();
723
724        let client: Arc<FileClient<Block>> = Arc::new(
725            FileClient::from_file(file.into(), NoopConsensus::arc())
726                .await
727                .unwrap()
728                .with_bodies(bodies.clone().into_iter().collect()),
729        );
730        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
731            client.clone(),
732            Arc::new(TestConsensus::default()),
733            factory,
734        );
735        downloader.set_download_range(0..=19).expect("failed to set download range");
736
737        assert_matches!(
738            downloader.next().await,
739            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
740        );
741    }
742
743    #[tokio::test]
744    async fn download_headers_at_fork_head() {
745        reth_tracing::init_test_tracing();
746
747        let p3 = SealedHeader::default();
748        let p2 = child_header(&p3);
749        let p1 = child_header(&p2);
750        let p0 = child_header(&p1);
751
752        let file = tempfile::tempfile().unwrap();
753        let client: Arc<FileClient<Block>> = Arc::new(
754            FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
755                HashMap::from([
756                    (0u64, p0.clone_header()),
757                    (1, p1.clone_header()),
758                    (2, p2.clone_header()),
759                    (3, p3.clone_header()),
760                ]),
761            ),
762        );
763
764        let mut downloader = ReverseHeadersDownloaderBuilder::default()
765            .stream_batch_size(3)
766            .request_limit(3)
767            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
768        downloader.update_local_head(p3.clone());
769        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
770
771        let headers = downloader.next().await.unwrap();
772        assert_eq!(headers.unwrap(), vec![p0, p1, p2]);
773        assert!(downloader.next().await.is_none());
774        assert!(downloader.next().await.is_none());
775    }
776
777    #[tokio::test]
778    async fn test_download_headers_from_file() {
779        reth_tracing::init_test_tracing();
780
781        // Generate some random blocks
782        let (file, headers, _) = generate_bodies_file(0..=19).await;
783        // now try to read them back
784        let client: Arc<FileClient<Block>> =
785            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
786
787        // construct headers downloader and use first header
788        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
789            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
790        header_downloader.update_local_head(headers.first().unwrap().clone());
791        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
792
793        // get headers first
794        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
795
796        // reverse to make sure it's in the right order before comparing
797        downloaded_headers.reverse();
798
799        // the first header is not included in the response
800        assert_eq!(downloaded_headers, headers[1..]);
801    }
802
803    #[tokio::test]
804    async fn test_download_bodies_from_file() {
805        // Generate some random blocks
806        let factory = create_test_provider_factory();
807        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
808
809        // now try to read them back
810        let client: Arc<FileClient<Block>> =
811            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
812
813        // insert headers in db for the bodies downloader
814        insert_headers(&factory, &headers);
815
816        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
817            client.clone(),
818            Arc::new(TestConsensus::default()),
819            factory,
820        );
821        downloader.set_download_range(0..=19).expect("failed to set download range");
822
823        assert_matches!(
824            downloader.next().await,
825            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
826        );
827    }
828
829    #[tokio::test]
830    async fn strict_chunk_decode_fails_on_invalid_block() {
831        let (file, _, _) = generate_bodies_file(0..=2).await;
832        let chunk_byte_len = file.metadata().await.unwrap().len();
833        let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len, false).await.unwrap();
834        let consensus = Arc::new(TestConsensus::default());
835        consensus.set_fail_validation(true);
836
837        let err = reader.next_chunk::<Block>(consensus, None).await.unwrap_err();
838
839        assert_matches!(err, FileClientError::Consensus(ConsensusError::BaseFeeMissing));
840    }
841
842    #[tokio::test]
843    async fn lenient_chunk_decode_skips_invalid_blocks() {
844        let (file, _, _) = generate_bodies_file(0..=2).await;
845        let chunk_byte_len = file.metadata().await.unwrap().len();
846        let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len, false).await.unwrap();
847        let consensus = Arc::new(TestConsensus::default());
848        consensus.set_fail_validation(true);
849
850        let client = reader
851            .next_chunk_with_invalid_block_handling::<Block>(consensus, None, true)
852            .await
853            .unwrap()
854            .unwrap();
855
856        assert_eq!(client.headers_len(), 0);
857        assert!(client.tip().is_none());
858    }
859
860    #[tokio::test]
861    async fn test_chunk_download_headers_from_file() {
862        reth_tracing::init_test_tracing();
863
864        // Generate some random blocks
865        let (file, headers, _) = generate_bodies_file(0..=14).await;
866
867        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
868        // one block will be read
869        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
870        trace!(target: "downloaders::file::test", chunk_byte_len);
871
872        // init reader
873        let mut reader =
874            ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
875
876        let mut downloaded_headers: Vec<SealedHeader> = vec![];
877
878        let mut local_header = headers.first().unwrap().clone();
879
880        // test
881        while let Some(client) =
882            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
883        {
884            let sync_target = client.tip_header().unwrap();
885
886            let sync_target_hash = sync_target.hash();
887
888            // construct headers downloader and use first header
889            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
890                .build(Arc::new(client), Arc::new(TestConsensus::default()));
891            header_downloader.update_local_head(local_header.clone());
892            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
893
894            // get headers first
895            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
896
897            // export new local header to outer scope
898            local_header = sync_target;
899
900            // reverse to make sure it's in the right order before comparing
901            downloaded_headers_chunk.reverse();
902            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
903        }
904
905        // the first header is not included in the response
906        assert_eq!(headers[1..], downloaded_headers);
907    }
908
909    #[tokio::test]
910    async fn test_chunk_download_headers_from_gzip_file() {
911        reth_tracing::init_test_tracing();
912
913        // Generate some random blocks
914        let (file, headers, _) = generate_bodies_file(0..=14).await;
915
916        // Create a gzipped version of the file
917        let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
918        let gzip_path = gzip_temp_file.path().to_owned();
919        drop(gzip_temp_file); // Close the file so we can write to it
920
921        // Read original file content first
922        let mut original_file = file;
923        original_file.seek(SeekFrom::Start(0)).await.unwrap();
924        let mut original_content = Vec::new();
925        original_file.read_to_end(&mut original_content).await.unwrap();
926
927        let mut gzip_file = File::create(&gzip_path).await.unwrap();
928        let mut encoder = GzipEncoder::new(&mut gzip_file);
929
930        // Write the original content through the gzip encoder
931        encoder.write_all(&original_content).await.unwrap();
932        encoder.shutdown().await.unwrap();
933        drop(gzip_file);
934
935        // Reopen the gzipped file for reading
936        let gzip_file = File::open(&gzip_path).await.unwrap();
937
938        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
939        // one block will be read
940        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
941        trace!(target: "downloaders::file::test", chunk_byte_len);
942
943        // init reader with gzip=true
944        let mut reader =
945            ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
946
947        let mut downloaded_headers: Vec<SealedHeader> = vec![];
948
949        let mut local_header = headers.first().unwrap().clone();
950
951        // test
952        while let Some(client) =
953            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
954        {
955            if client.headers_len() == 0 {
956                continue;
957            }
958
959            let sync_target = client.tip_header().expect("tip_header should not be None");
960
961            let sync_target_hash = sync_target.hash();
962
963            // construct headers downloader and use first header
964            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
965                .build(Arc::new(client), Arc::new(TestConsensus::default()));
966            header_downloader.update_local_head(local_header.clone());
967            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
968
969            // get headers first
970            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
971
972            // export new local header to outer scope
973            local_header = sync_target;
974
975            // reverse to make sure it's in the right order before comparing
976            downloaded_headers_chunk.reverse();
977            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
978        }
979
980        // the first header is not included in the response
981        assert_eq!(headers[1..], downloaded_headers);
982    }
983}