1use std::{collections::HashMap, io, path::Path, sync::Arc};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
6use futures::Future;
7use itertools::Either;
8use reth_consensus::{Consensus, ConsensusError};
9use reth_network_p2p::{
10 bodies::client::{BodiesClient, BodiesFut},
11 download::DownloadClient,
12 error::RequestError,
13 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
14 priority::Priority,
15 BlockClient,
16};
17use reth_network_peers::PeerId;
18use reth_primitives::{SealedBlock, SealedHeader};
19use reth_primitives_traits::{Block, BlockBody, FullBlock};
20use thiserror::Error;
21use tokio::{fs::File, io::AsyncReadExt};
22use tokio_stream::StreamExt;
23use tokio_util::codec::FramedRead;
24use tracing::{debug, trace, warn};
25
26use super::file_codec::BlockFileCodec;
27use crate::receipt_file_client::FromReceiptReader;
28
29pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
33
34#[derive(Debug, Clone)]
46pub struct FileClient<B: Block = reth_primitives::Block> {
47 headers: HashMap<BlockNumber, B::Header>,
49
50 hash_to_number: HashMap<BlockHash, BlockNumber>,
52
53 bodies: HashMap<BlockHash, B::Body>,
55}
56
57#[derive(Debug, Error)]
59pub enum FileClientError {
60 #[error(transparent)]
62 Consensus(#[from] ConsensusError),
63
64 #[error(transparent)]
66 Io(#[from] std::io::Error),
67
68 #[error("{0}")]
70 Rlp(alloy_rlp::Error, Vec<u8>),
71
72 #[error("{0}")]
74 Custom(&'static str),
75}
76
77impl From<&'static str> for FileClientError {
78 fn from(value: &'static str) -> Self {
79 Self::Custom(value)
80 }
81}
82
83impl<B: FullBlock> FileClient<B> {
84 pub async fn new<P: AsRef<Path>>(
86 path: P,
87 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
88 ) -> Result<Self, FileClientError> {
89 let file = File::open(path).await?;
90 Self::from_file(file, consensus).await
91 }
92
93 pub(crate) async fn from_file(
95 mut file: File,
96 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
97 ) -> Result<Self, FileClientError> {
98 let metadata = file.metadata().await?;
100 let file_len = metadata.len();
101
102 let mut reader = vec![];
103 file.read_to_end(&mut reader).await?;
104
105 Ok(FileClientBuilder { consensus, parent_header: None }
106 .build(&reader[..], file_len)
107 .await?
108 .file_client)
109 }
110
111 pub fn tip(&self) -> Option<B256> {
113 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
114 }
115
116 pub fn start(&self) -> Option<B256> {
118 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
119 }
120
121 pub fn max_block(&self) -> Option<u64> {
123 self.headers.keys().max().copied()
124 }
125
126 pub fn min_block(&self) -> Option<u64> {
128 self.headers.keys().min().copied()
129 }
130
131 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
134 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
135 }
136
137 pub fn has_canonical_blocks(&self) -> bool {
139 if self.headers.is_empty() {
140 return true
141 }
142 let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
143 nums.sort_unstable();
144 let mut iter = nums.into_iter();
145 let mut lowest = iter.next().expect("not empty");
146 for next in iter {
147 if next != lowest + 1 {
148 return false
149 }
150 lowest = next;
151 }
152 true
153 }
154
155 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
157 self.bodies = bodies;
158 self
159 }
160
161 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
163 self.headers = headers;
164 for (number, header) in &self.headers {
165 self.hash_to_number.insert(header.hash_slow(), *number);
166 }
167 self
168 }
169
170 pub fn headers_len(&self) -> usize {
172 self.headers.len()
173 }
174
175 pub fn bodies_len(&self) -> usize {
177 self.bodies.len()
178 }
179
180 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
182 self.headers.values()
183 }
184
185 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
189 let bodies = &mut self.bodies;
190 let numbers = &self.hash_to_number;
191 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
192 }
193
194 pub fn total_transactions(&self) -> usize {
196 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
197 }
198}
199
200struct FileClientBuilder<B: Block = reth_primitives::Block> {
201 pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
202 pub parent_header: Option<SealedHeader<B::Header>>,
203}
204
205impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
206 for FileClientBuilder<B>
207{
208 type Error = FileClientError;
209 type Output = FileClient<B>;
210
211 fn build<R>(
213 &self,
214 reader: R,
215 num_bytes: u64,
216 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
217 where
218 R: AsyncReadExt + Unpin,
219 {
220 let mut headers = HashMap::default();
221 let mut hash_to_number = HashMap::default();
222 let mut bodies = HashMap::default();
223
224 let mut stream =
226 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
227
228 trace!(target: "downloaders::file",
229 target_num_bytes=num_bytes,
230 capacity=stream.read_buffer().capacity(),
231 "init decode stream"
232 );
233
234 let mut remaining_bytes = vec![];
235
236 let mut log_interval = 0;
237 let mut log_interval_start_block = 0;
238
239 let mut parent_header = self.parent_header.clone();
240
241 async move {
242 while let Some(block_res) = stream.next().await {
243 let block = match block_res {
244 Ok(block) => block,
245 Err(FileClientError::Rlp(err, bytes)) => {
246 trace!(target: "downloaders::file",
247 %err,
248 bytes_len=bytes.len(),
249 "partial block returned from decoding chunk"
250 );
251 remaining_bytes = bytes;
252 break
253 }
254 Err(err) => return Err(err),
255 };
256
257 let block = SealedBlock::seal_slow(block);
258
259 self.consensus.validate_header(block.sealed_header())?;
261 if let Some(parent) = &parent_header {
262 self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
263 parent_header = Some(block.sealed_header().clone());
264 }
265
266 self.consensus.validate_block_pre_execution(&block)?;
268
269 let block_hash = block.hash();
271 let block_number = block.number();
272 let (header, body) = block.split_sealed_header_body();
273 headers.insert(block_number, header.unseal());
274 hash_to_number.insert(block_hash, block_number);
275 bodies.insert(block_hash, body);
276
277 if log_interval == 0 {
278 trace!(target: "downloaders::file",
279 block_number,
280 "read first block"
281 );
282 log_interval_start_block = block_number;
283 } else if log_interval % 100_000 == 0 {
284 trace!(target: "downloaders::file",
285 blocks=?log_interval_start_block..=block_number,
286 "read blocks from file"
287 );
288 log_interval_start_block = block_number + 1;
289 }
290 log_interval += 1;
291 }
292
293 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
294
295 Ok(DecodedFileChunk {
296 file_client: FileClient { headers, hash_to_number, bodies },
297 remaining_bytes,
298 highest_block: None,
299 })
300 }
301 }
302}
303
304impl<B: FullBlock> HeadersClient for FileClient<B> {
305 type Header = B::Header;
306 type Output = HeadersFut<B::Header>;
307
308 fn get_headers_with_priority(
309 &self,
310 request: HeadersRequest,
311 _priority: Priority,
312 ) -> Self::Output {
313 let mut headers = Vec::new();
315 trace!(target: "downloaders::file", request=?request, "Getting headers");
316
317 let start_num = match request.start {
318 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
319 Some(num) => *num,
320 None => {
321 warn!(%hash, "Could not find starting block number for requested header hash");
322 return Box::pin(async move { Err(RequestError::BadResponse) })
323 }
324 },
325 BlockHashOrNumber::Number(num) => num,
326 };
327
328 let range = if request.limit == 1 {
329 Either::Left(start_num..start_num + 1)
330 } else {
331 match request.direction {
332 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
333 HeadersDirection::Falling => {
334 Either::Right((start_num - request.limit + 1..=start_num).rev())
335 }
336 }
337 };
338
339 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
340
341 for block_number in range {
342 match self.headers.get(&block_number).cloned() {
343 Some(header) => headers.push(header),
344 None => {
345 warn!(number=%block_number, "Could not find header");
346 return Box::pin(async move { Err(RequestError::BadResponse) })
347 }
348 }
349 }
350
351 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
352 }
353}
354
355impl<B: FullBlock> BodiesClient for FileClient<B> {
356 type Body = B::Body;
357 type Output = BodiesFut<B::Body>;
358
359 fn get_block_bodies_with_priority(
360 &self,
361 hashes: Vec<B256>,
362 _priority: Priority,
363 ) -> Self::Output {
364 let mut bodies = Vec::new();
366
367 for hash in hashes {
370 match self.bodies.get(&hash).cloned() {
371 Some(body) => bodies.push(body),
372 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
373 }
374 }
375
376 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
377 }
378}
379
380impl<B: FullBlock> DownloadClient for FileClient<B> {
381 fn report_bad_message(&self, _peer_id: PeerId) {
382 trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
383 }
385
386 fn num_connected_peers(&self) -> usize {
387 1
389 }
390}
391
392impl<B: FullBlock> BlockClient for FileClient<B> {
393 type Block = B;
394}
395
396#[derive(Debug)]
398pub struct ChunkedFileReader {
399 file: File,
401 file_byte_len: u64,
403 chunk: Vec<u8>,
405 chunk_byte_len: u64,
407 highest_block: Option<u64>,
410}
411
412impl ChunkedFileReader {
413 pub const fn file_len(&self) -> u64 {
415 self.file_byte_len
416 }
417
418 pub async fn new<P: AsRef<Path>>(
421 path: P,
422 chunk_byte_len: Option<u64>,
423 ) -> Result<Self, FileClientError> {
424 let file = File::open(path).await?;
425 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
426
427 Self::from_file(file, chunk_byte_len).await
428 }
429
430 pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
432 let metadata = file.metadata().await?;
434 let file_byte_len = metadata.len();
435
436 Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
437 }
438
439 fn chunk_len(&self) -> u64 {
442 let Self { chunk_byte_len, file_byte_len, .. } = *self;
443 let file_byte_len = file_byte_len + self.chunk.len() as u64;
444
445 if chunk_byte_len > file_byte_len {
446 file_byte_len
448 } else {
449 chunk_byte_len
450 }
451 }
452
453 async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
456 if self.file_byte_len == 0 && self.chunk.is_empty() {
457 return Ok(None)
459 }
460
461 let chunk_target_len = self.chunk_len();
462 let old_bytes_len = self.chunk.len() as u64;
463
464 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
466
467 let prev_read_bytes_len = self.chunk.len();
469 self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
470 let reader = &mut self.chunk[prev_read_bytes_len..];
471
472 let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
474 let next_chunk_byte_len = self.chunk.len();
475
476 self.file_byte_len -= new_read_bytes_len;
478
479 debug!(target: "downloaders::file",
480 max_chunk_byte_len=self.chunk_byte_len,
481 prev_read_bytes_len,
482 new_read_bytes_target_len,
483 new_read_bytes_len,
484 next_chunk_byte_len,
485 remaining_file_byte_len=self.file_byte_len,
486 "new bytes were read from file"
487 );
488
489 Ok(Some(next_chunk_byte_len as u64))
490 }
491
492 pub async fn next_chunk<B: FullBlock>(
494 &mut self,
495 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
496 parent_header: Option<SealedHeader<B::Header>>,
497 ) -> Result<Option<FileClient<B>>, FileClientError> {
498 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
499
500 let DecodedFileChunk { file_client, remaining_bytes, .. } =
502 FileClientBuilder { consensus, parent_header }
503 .build(&self.chunk[..], next_chunk_byte_len)
504 .await?;
505
506 self.chunk = remaining_bytes;
508
509 Ok(Some(file_client))
510 }
511
512 pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
514 where
515 T: FromReceiptReader,
516 {
517 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
518
519 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
521 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
522 .await?;
523
524 self.chunk = remaining_bytes;
526 self.highest_block = highest_block;
528
529 Ok(Some(file_client))
530 }
531}
532
533pub trait FromReader {
535 type Error: From<io::Error>;
537
538 type Output;
540
541 fn build<R>(
543 &self,
544 reader: R,
545 num_bytes: u64,
546 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
547 where
548 Self: Sized,
549 R: AsyncReadExt + Unpin;
550}
551
552#[derive(Debug)]
554pub struct DecodedFileChunk<T> {
555 pub file_client: T,
557 pub remaining_bytes: Vec<u8>,
559 pub highest_block: Option<u64>,
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use crate::{
568 bodies::{
569 bodies::BodiesDownloaderBuilder,
570 test_utils::{insert_headers, zip_blocks},
571 },
572 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
573 test_utils::{generate_bodies, generate_bodies_file},
574 };
575 use assert_matches::assert_matches;
576 use futures_util::stream::StreamExt;
577 use rand::Rng;
578 use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
579 use reth_network_p2p::{
580 bodies::downloader::BodyDownloader,
581 headers::downloader::{HeaderDownloader, SyncTarget},
582 };
583 use reth_primitives::Block;
584 use reth_provider::test_utils::create_test_provider_factory;
585 use std::sync::Arc;
586
587 #[tokio::test]
588 async fn streams_bodies_from_buffer() {
589 let factory = create_test_provider_factory();
591 let (headers, mut bodies) = generate_bodies(0..=19);
592
593 insert_headers(factory.db_ref().db(), &headers);
594
595 let file = tempfile::tempfile().unwrap();
597
598 let client: Arc<FileClient> = Arc::new(
599 FileClient::from_file(file.into(), NoopConsensus::arc())
600 .await
601 .unwrap()
602 .with_bodies(bodies.clone()),
603 );
604 let mut downloader = BodiesDownloaderBuilder::default()
605 .build::<reth_primitives::Block, _, _>(
606 client.clone(),
607 Arc::new(TestConsensus::default()),
608 factory,
609 );
610 downloader.set_download_range(0..=19).expect("failed to set download range");
611
612 assert_matches!(
613 downloader.next().await,
614 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
615 );
616 }
617
618 #[tokio::test]
619 async fn download_headers_at_fork_head() {
620 reth_tracing::init_test_tracing();
621
622 let p3 = SealedHeader::default();
623 let p2 = child_header(&p3);
624 let p1 = child_header(&p2);
625 let p0 = child_header(&p1);
626
627 let file = tempfile::tempfile().unwrap();
628 let client: Arc<FileClient> = Arc::new(
629 FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
630 HashMap::from([
631 (0u64, p0.clone_header()),
632 (1, p1.clone_header()),
633 (2, p2.clone_header()),
634 (3, p3.clone_header()),
635 ]),
636 ),
637 );
638
639 let mut downloader = ReverseHeadersDownloaderBuilder::default()
640 .stream_batch_size(3)
641 .request_limit(3)
642 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
643 downloader.update_local_head(p3.clone());
644 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
645
646 let headers = downloader.next().await.unwrap();
647 assert_eq!(headers, Ok(vec![p0, p1, p2]));
648 assert!(downloader.next().await.is_none());
649 assert!(downloader.next().await.is_none());
650 }
651
652 #[tokio::test]
653 async fn test_download_headers_from_file() {
654 reth_tracing::init_test_tracing();
655
656 let (file, headers, _) = generate_bodies_file(0..=19).await;
658 let client: Arc<FileClient> =
660 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
661
662 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
664 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
665 header_downloader.update_local_head(headers.first().unwrap().clone());
666 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
667
668 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
670
671 downloaded_headers.reverse();
673
674 assert_eq!(downloaded_headers, headers[1..]);
676 }
677
678 #[tokio::test]
679 async fn test_download_bodies_from_file() {
680 let factory = create_test_provider_factory();
682 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
683
684 let client: Arc<FileClient> =
686 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
687
688 insert_headers(factory.db_ref().db(), &headers);
690
691 let mut downloader = BodiesDownloaderBuilder::default()
692 .build::<reth_primitives::Block, _, _>(
693 client.clone(),
694 Arc::new(TestConsensus::default()),
695 factory,
696 );
697 downloader.set_download_range(0..=19).expect("failed to set download range");
698
699 assert_matches!(
700 downloader.next().await,
701 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
702 );
703 }
704
705 #[tokio::test]
706 async fn test_chunk_download_headers_from_file() {
707 reth_tracing::init_test_tracing();
708
709 let (file, headers, _) = generate_bodies_file(0..=14).await;
711
712 let chunk_byte_len = rand::thread_rng().gen_range(2000..=10_000);
715 trace!(target: "downloaders::file::test", chunk_byte_len);
716
717 let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
719
720 let mut downloaded_headers: Vec<SealedHeader> = vec![];
721
722 let mut local_header = headers.first().unwrap().clone();
723
724 while let Some(client) =
726 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
727 {
728 let sync_target = client.tip_header().unwrap();
729
730 let sync_target_hash = sync_target.hash();
731
732 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
734 .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
735 header_downloader.update_local_head(local_header.clone());
736 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
737
738 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
740
741 local_header = sync_target;
743
744 downloaded_headers_chunk.reverse();
746 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
747 }
748
749 assert_eq!(headers[1..], downloaded_headers);
751 }
752}