1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use futures::Future;
5use itertools::Either;
6use reth_consensus::{Consensus, ConsensusError};
7use reth_network_p2p::{
8 bodies::client::{BodiesClient, BodiesFut},
9 download::DownloadClient,
10 error::RequestError,
11 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
12 priority::Priority,
13 BlockClient,
14};
15use reth_network_peers::PeerId;
16use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
17use std::{collections::HashMap, io, path::Path, sync::Arc};
18use thiserror::Error;
19use tokio::{fs::File, io::AsyncReadExt};
20use tokio_stream::StreamExt;
21use tokio_util::codec::FramedRead;
22use tracing::{debug, trace, warn};
23
24use super::file_codec::BlockFileCodec;
25use crate::receipt_file_client::FromReceiptReader;
26
27pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
31
32#[derive(Debug, Clone)]
44pub struct FileClient<B: Block> {
45 headers: HashMap<BlockNumber, B::Header>,
47
48 hash_to_number: HashMap<BlockHash, BlockNumber>,
50
51 bodies: HashMap<BlockHash, B::Body>,
53}
54
55#[derive(Debug, Error)]
57pub enum FileClientError {
58 #[error(transparent)]
60 Consensus(#[from] ConsensusError),
61
62 #[error(transparent)]
64 Io(#[from] std::io::Error),
65
66 #[error("{0}")]
68 Rlp(alloy_rlp::Error, Vec<u8>),
69
70 #[error("{0}")]
72 Custom(&'static str),
73}
74
75impl From<&'static str> for FileClientError {
76 fn from(value: &'static str) -> Self {
77 Self::Custom(value)
78 }
79}
80
81impl<B: FullBlock> FileClient<B> {
82 pub async fn new<P: AsRef<Path>>(
84 path: P,
85 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
86 ) -> Result<Self, FileClientError> {
87 let file = File::open(path).await?;
88 Self::from_file(file, consensus).await
89 }
90
91 pub(crate) async fn from_file(
93 mut file: File,
94 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
95 ) -> Result<Self, FileClientError> {
96 let metadata = file.metadata().await?;
98 let file_len = metadata.len();
99
100 let mut reader = vec![];
101 file.read_to_end(&mut reader).await?;
102
103 Ok(FileClientBuilder { consensus, parent_header: None }
104 .build(&reader[..], file_len)
105 .await?
106 .file_client)
107 }
108
109 pub fn tip(&self) -> Option<B256> {
111 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
112 }
113
114 pub fn start(&self) -> Option<B256> {
116 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
117 }
118
119 pub fn max_block(&self) -> Option<u64> {
121 self.headers.keys().max().copied()
122 }
123
124 pub fn min_block(&self) -> Option<u64> {
126 self.headers.keys().min().copied()
127 }
128
129 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
132 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
133 }
134
135 pub fn has_canonical_blocks(&self) -> bool {
137 if self.headers.is_empty() {
138 return true
139 }
140 let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
141 nums.sort_unstable();
142 let mut iter = nums.into_iter();
143 let mut lowest = iter.next().expect("not empty");
144 for next in iter {
145 if next != lowest + 1 {
146 return false
147 }
148 lowest = next;
149 }
150 true
151 }
152
153 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
155 self.bodies = bodies;
156 self
157 }
158
159 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
161 self.headers = headers;
162 for (number, header) in &self.headers {
163 self.hash_to_number.insert(header.hash_slow(), *number);
164 }
165 self
166 }
167
168 pub fn headers_len(&self) -> usize {
170 self.headers.len()
171 }
172
173 pub fn bodies_len(&self) -> usize {
175 self.bodies.len()
176 }
177
178 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
180 self.headers.values()
181 }
182
183 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
187 let bodies = &mut self.bodies;
188 let numbers = &self.hash_to_number;
189 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
190 }
191
192 pub fn total_transactions(&self) -> usize {
194 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
195 }
196}
197
198struct FileClientBuilder<B: Block> {
199 pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
200 pub parent_header: Option<SealedHeader<B::Header>>,
201}
202
203impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
204 for FileClientBuilder<B>
205{
206 type Error = FileClientError;
207 type Output = FileClient<B>;
208
209 fn build<R>(
211 &self,
212 reader: R,
213 num_bytes: u64,
214 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
215 where
216 R: AsyncReadExt + Unpin,
217 {
218 let mut headers = HashMap::default();
219 let mut hash_to_number = HashMap::default();
220 let mut bodies = HashMap::default();
221
222 let mut stream =
224 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
225
226 trace!(target: "downloaders::file",
227 target_num_bytes=num_bytes,
228 capacity=stream.read_buffer().capacity(),
229 "init decode stream"
230 );
231
232 let mut remaining_bytes = vec![];
233
234 let mut log_interval = 0;
235 let mut log_interval_start_block = 0;
236
237 let mut parent_header = self.parent_header.clone();
238
239 async move {
240 while let Some(block_res) = stream.next().await {
241 let block = match block_res {
242 Ok(block) => block,
243 Err(FileClientError::Rlp(err, bytes)) => {
244 trace!(target: "downloaders::file",
245 %err,
246 bytes_len=bytes.len(),
247 "partial block returned from decoding chunk"
248 );
249 remaining_bytes = bytes;
250 break
251 }
252 Err(err) => return Err(err),
253 };
254
255 let block = SealedBlock::seal_slow(block);
256
257 self.consensus.validate_header(block.sealed_header())?;
259 if let Some(parent) = &parent_header {
260 self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
261 parent_header = Some(block.sealed_header().clone());
262 }
263
264 self.consensus.validate_block_pre_execution(&block)?;
266
267 let block_hash = block.hash();
269 let block_number = block.number();
270 let (header, body) = block.split_sealed_header_body();
271 headers.insert(block_number, header.unseal());
272 hash_to_number.insert(block_hash, block_number);
273 bodies.insert(block_hash, body);
274
275 if log_interval == 0 {
276 trace!(target: "downloaders::file",
277 block_number,
278 "read first block"
279 );
280 log_interval_start_block = block_number;
281 } else if log_interval % 100_000 == 0 {
282 trace!(target: "downloaders::file",
283 blocks=?log_interval_start_block..=block_number,
284 "read blocks from file"
285 );
286 log_interval_start_block = block_number + 1;
287 }
288 log_interval += 1;
289 }
290
291 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
292
293 Ok(DecodedFileChunk {
294 file_client: FileClient { headers, hash_to_number, bodies },
295 remaining_bytes,
296 highest_block: None,
297 })
298 }
299 }
300}
301
302impl<B: FullBlock> HeadersClient for FileClient<B> {
303 type Header = B::Header;
304 type Output = HeadersFut<B::Header>;
305
306 fn get_headers_with_priority(
307 &self,
308 request: HeadersRequest,
309 _priority: Priority,
310 ) -> Self::Output {
311 let mut headers = Vec::new();
313 trace!(target: "downloaders::file", request=?request, "Getting headers");
314
315 let start_num = match request.start {
316 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
317 Some(num) => *num,
318 None => {
319 warn!(%hash, "Could not find starting block number for requested header hash");
320 return Box::pin(async move { Err(RequestError::BadResponse) })
321 }
322 },
323 BlockHashOrNumber::Number(num) => num,
324 };
325
326 let range = if request.limit == 1 {
327 Either::Left(start_num..start_num + 1)
328 } else {
329 match request.direction {
330 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
331 HeadersDirection::Falling => {
332 Either::Right((start_num - request.limit + 1..=start_num).rev())
333 }
334 }
335 };
336
337 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
338
339 for block_number in range {
340 match self.headers.get(&block_number).cloned() {
341 Some(header) => headers.push(header),
342 None => {
343 warn!(number=%block_number, "Could not find header");
344 return Box::pin(async move { Err(RequestError::BadResponse) })
345 }
346 }
347 }
348
349 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
350 }
351}
352
353impl<B: FullBlock> BodiesClient for FileClient<B> {
354 type Body = B::Body;
355 type Output = BodiesFut<B::Body>;
356
357 fn get_block_bodies_with_priority(
358 &self,
359 hashes: Vec<B256>,
360 _priority: Priority,
361 ) -> Self::Output {
362 let mut bodies = Vec::new();
364
365 for hash in hashes {
368 match self.bodies.get(&hash).cloned() {
369 Some(body) => bodies.push(body),
370 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
371 }
372 }
373
374 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
375 }
376}
377
378impl<B: FullBlock> DownloadClient for FileClient<B> {
379 fn report_bad_message(&self, _peer_id: PeerId) {
380 trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
381 }
383
384 fn num_connected_peers(&self) -> usize {
385 1
387 }
388}
389
390impl<B: FullBlock> BlockClient for FileClient<B> {
391 type Block = B;
392}
393
394#[derive(Debug)]
396pub struct ChunkedFileReader {
397 file: File,
399 file_byte_len: u64,
401 chunk: Vec<u8>,
403 chunk_byte_len: u64,
405 highest_block: Option<u64>,
408}
409
410impl ChunkedFileReader {
411 pub const fn file_len(&self) -> u64 {
413 self.file_byte_len
414 }
415
416 pub async fn new<P: AsRef<Path>>(
419 path: P,
420 chunk_byte_len: Option<u64>,
421 ) -> Result<Self, FileClientError> {
422 let file = File::open(path).await?;
423 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
424
425 Self::from_file(file, chunk_byte_len).await
426 }
427
428 pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
430 let metadata = file.metadata().await?;
432 let file_byte_len = metadata.len();
433
434 Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
435 }
436
437 fn chunk_len(&self) -> u64 {
440 let Self { chunk_byte_len, file_byte_len, .. } = *self;
441 let file_byte_len = file_byte_len + self.chunk.len() as u64;
442
443 if chunk_byte_len > file_byte_len {
444 file_byte_len
446 } else {
447 chunk_byte_len
448 }
449 }
450
451 async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
454 if self.file_byte_len == 0 && self.chunk.is_empty() {
455 return Ok(None)
457 }
458
459 let chunk_target_len = self.chunk_len();
460 let old_bytes_len = self.chunk.len() as u64;
461
462 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
464
465 let prev_read_bytes_len = self.chunk.len();
467 self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
468 let reader = &mut self.chunk[prev_read_bytes_len..];
469
470 let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
472 let next_chunk_byte_len = self.chunk.len();
473
474 self.file_byte_len -= new_read_bytes_len;
476
477 debug!(target: "downloaders::file",
478 max_chunk_byte_len=self.chunk_byte_len,
479 prev_read_bytes_len,
480 new_read_bytes_target_len,
481 new_read_bytes_len,
482 next_chunk_byte_len,
483 remaining_file_byte_len=self.file_byte_len,
484 "new bytes were read from file"
485 );
486
487 Ok(Some(next_chunk_byte_len as u64))
488 }
489
490 pub async fn next_chunk<B: FullBlock>(
492 &mut self,
493 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
494 parent_header: Option<SealedHeader<B::Header>>,
495 ) -> Result<Option<FileClient<B>>, FileClientError> {
496 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
497
498 let DecodedFileChunk { file_client, remaining_bytes, .. } =
500 FileClientBuilder { consensus, parent_header }
501 .build(&self.chunk[..], next_chunk_byte_len)
502 .await?;
503
504 self.chunk = remaining_bytes;
506
507 Ok(Some(file_client))
508 }
509
510 pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
512 where
513 T: FromReceiptReader,
514 {
515 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
516
517 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
519 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
520 .await?;
521
522 self.chunk = remaining_bytes;
524 self.highest_block = highest_block;
526
527 Ok(Some(file_client))
528 }
529}
530
531pub trait FromReader {
533 type Error: From<io::Error>;
535
536 type Output;
538
539 fn build<R>(
541 &self,
542 reader: R,
543 num_bytes: u64,
544 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
545 where
546 Self: Sized,
547 R: AsyncReadExt + Unpin;
548}
549
550#[derive(Debug)]
552pub struct DecodedFileChunk<T> {
553 pub file_client: T,
555 pub remaining_bytes: Vec<u8>,
557 pub highest_block: Option<u64>,
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use crate::{
566 bodies::{
567 bodies::BodiesDownloaderBuilder,
568 test_utils::{insert_headers, zip_blocks},
569 },
570 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
571 test_utils::{generate_bodies, generate_bodies_file},
572 };
573 use assert_matches::assert_matches;
574 use futures_util::stream::StreamExt;
575 use rand::Rng;
576 use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
577 use reth_ethereum_primitives::Block;
578 use reth_network_p2p::{
579 bodies::downloader::BodyDownloader,
580 headers::downloader::{HeaderDownloader, SyncTarget},
581 };
582 use reth_provider::test_utils::create_test_provider_factory;
583 use std::sync::Arc;
584
585 #[tokio::test]
586 async fn streams_bodies_from_buffer() {
587 let factory = create_test_provider_factory();
589 let (headers, mut bodies) = generate_bodies(0..=19);
590
591 insert_headers(factory.db_ref().db(), &headers);
592
593 let file = tempfile::tempfile().unwrap();
595
596 let client: Arc<FileClient<Block>> = Arc::new(
597 FileClient::from_file(file.into(), NoopConsensus::arc())
598 .await
599 .unwrap()
600 .with_bodies(bodies.clone()),
601 );
602 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
603 client.clone(),
604 Arc::new(TestConsensus::default()),
605 factory,
606 );
607 downloader.set_download_range(0..=19).expect("failed to set download range");
608
609 assert_matches!(
610 downloader.next().await,
611 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
612 );
613 }
614
615 #[tokio::test]
616 async fn download_headers_at_fork_head() {
617 reth_tracing::init_test_tracing();
618
619 let p3 = SealedHeader::default();
620 let p2 = child_header(&p3);
621 let p1 = child_header(&p2);
622 let p0 = child_header(&p1);
623
624 let file = tempfile::tempfile().unwrap();
625 let client: Arc<FileClient<Block>> = Arc::new(
626 FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
627 HashMap::from([
628 (0u64, p0.clone_header()),
629 (1, p1.clone_header()),
630 (2, p2.clone_header()),
631 (3, p3.clone_header()),
632 ]),
633 ),
634 );
635
636 let mut downloader = ReverseHeadersDownloaderBuilder::default()
637 .stream_batch_size(3)
638 .request_limit(3)
639 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
640 downloader.update_local_head(p3.clone());
641 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
642
643 let headers = downloader.next().await.unwrap();
644 assert_eq!(headers, Ok(vec![p0, p1, p2]));
645 assert!(downloader.next().await.is_none());
646 assert!(downloader.next().await.is_none());
647 }
648
649 #[tokio::test]
650 async fn test_download_headers_from_file() {
651 reth_tracing::init_test_tracing();
652
653 let (file, headers, _) = generate_bodies_file(0..=19).await;
655 let client: Arc<FileClient<Block>> =
657 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
658
659 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
661 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
662 header_downloader.update_local_head(headers.first().unwrap().clone());
663 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
664
665 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
667
668 downloaded_headers.reverse();
670
671 assert_eq!(downloaded_headers, headers[1..]);
673 }
674
675 #[tokio::test]
676 async fn test_download_bodies_from_file() {
677 let factory = create_test_provider_factory();
679 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
680
681 let client: Arc<FileClient<Block>> =
683 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
684
685 insert_headers(factory.db_ref().db(), &headers);
687
688 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
689 client.clone(),
690 Arc::new(TestConsensus::default()),
691 factory,
692 );
693 downloader.set_download_range(0..=19).expect("failed to set download range");
694
695 assert_matches!(
696 downloader.next().await,
697 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
698 );
699 }
700
701 #[tokio::test]
702 async fn test_chunk_download_headers_from_file() {
703 reth_tracing::init_test_tracing();
704
705 let (file, headers, _) = generate_bodies_file(0..=14).await;
707
708 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
711 trace!(target: "downloaders::file::test", chunk_byte_len);
712
713 let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
715
716 let mut downloaded_headers: Vec<SealedHeader> = vec![];
717
718 let mut local_header = headers.first().unwrap().clone();
719
720 while let Some(client) =
722 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
723 {
724 let sync_target = client.tip_header().unwrap();
725
726 let sync_target_hash = sync_target.hash();
727
728 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
730 .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
731 header_downloader.update_local_head(local_header.clone());
732 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
733
734 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
736
737 local_header = sync_target;
739
740 downloaded_headers_chunk.reverse();
742 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
743 }
744
745 assert_eq!(headers[1..], downloaded_headers);
747 }
748}