1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use futures::Future;
5use itertools::Either;
6use reth_consensus::{Consensus, ConsensusError};
7use reth_network_p2p::{
8 bodies::client::{BodiesClient, BodiesFut},
9 download::DownloadClient,
10 error::RequestError,
11 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
12 priority::Priority,
13 BlockClient,
14};
15use reth_network_peers::PeerId;
16use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
17use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
18use thiserror::Error;
19use tokio::{fs::File, io::AsyncReadExt};
20use tokio_stream::StreamExt;
21use tokio_util::codec::FramedRead;
22use tracing::{debug, trace, warn};
23
24use super::file_codec::BlockFileCodec;
25use crate::receipt_file_client::FromReceiptReader;
26
27pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
31
32#[derive(Debug, Clone)]
44pub struct FileClient<B: Block> {
45 headers: HashMap<BlockNumber, B::Header>,
47
48 hash_to_number: HashMap<BlockHash, BlockNumber>,
50
51 bodies: HashMap<BlockHash, B::Body>,
53}
54
55#[derive(Debug, Error)]
57pub enum FileClientError {
58 #[error(transparent)]
60 Consensus(#[from] ConsensusError),
61
62 #[error(transparent)]
64 Io(#[from] std::io::Error),
65
66 #[error("{0}")]
68 Rlp(alloy_rlp::Error, Vec<u8>),
69
70 #[error("{0}")]
72 Custom(&'static str),
73}
74
75impl From<&'static str> for FileClientError {
76 fn from(value: &'static str) -> Self {
77 Self::Custom(value)
78 }
79}
80
81impl<B: FullBlock> FileClient<B> {
82 pub async fn new<P: AsRef<Path>>(
84 path: P,
85 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
86 ) -> Result<Self, FileClientError> {
87 let file = File::open(path).await?;
88 Self::from_file(file, consensus).await
89 }
90
91 pub(crate) async fn from_file(
93 mut file: File,
94 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
95 ) -> Result<Self, FileClientError> {
96 let metadata = file.metadata().await?;
98 let file_len = metadata.len();
99
100 let mut reader = vec![];
101 file.read_to_end(&mut reader).await?;
102
103 Ok(FileClientBuilder { consensus, parent_header: None }
104 .build(&reader[..], file_len)
105 .await?
106 .file_client)
107 }
108
109 pub fn tip(&self) -> Option<B256> {
111 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
112 }
113
114 pub fn start(&self) -> Option<B256> {
116 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
117 }
118
119 pub fn max_block(&self) -> Option<u64> {
121 self.headers.keys().max().copied()
122 }
123
124 pub fn min_block(&self) -> Option<u64> {
126 self.headers.keys().min().copied()
127 }
128
129 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
132 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
133 }
134
135 pub fn has_canonical_blocks(&self) -> bool {
137 if self.headers.is_empty() {
138 return true
139 }
140 let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
141 nums.sort_unstable();
142 let mut iter = nums.into_iter();
143 let mut lowest = iter.next().expect("not empty");
144 for next in iter {
145 if next != lowest + 1 {
146 return false
147 }
148 lowest = next;
149 }
150 true
151 }
152
153 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
155 self.bodies = bodies;
156 self
157 }
158
159 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
161 self.headers = headers;
162 for (number, header) in &self.headers {
163 self.hash_to_number.insert(header.hash_slow(), *number);
164 }
165 self
166 }
167
168 pub fn headers_len(&self) -> usize {
170 self.headers.len()
171 }
172
173 pub fn bodies_len(&self) -> usize {
175 self.bodies.len()
176 }
177
178 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
180 self.headers.values()
181 }
182
183 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
187 let bodies = &mut self.bodies;
188 let numbers = &self.hash_to_number;
189 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
190 }
191
192 pub fn total_transactions(&self) -> usize {
194 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
195 }
196}
197
198struct FileClientBuilder<B: Block> {
199 pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
200 pub parent_header: Option<SealedHeader<B::Header>>,
201}
202
203impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
204 for FileClientBuilder<B>
205{
206 type Error = FileClientError;
207 type Output = FileClient<B>;
208
209 fn build<R>(
211 &self,
212 reader: R,
213 num_bytes: u64,
214 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
215 where
216 R: AsyncReadExt + Unpin,
217 {
218 let mut headers = HashMap::default();
219 let mut hash_to_number = HashMap::default();
220 let mut bodies = HashMap::default();
221
222 let mut stream =
224 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
225
226 trace!(target: "downloaders::file",
227 target_num_bytes=num_bytes,
228 capacity=stream.read_buffer().capacity(),
229 "init decode stream"
230 );
231
232 let mut remaining_bytes = vec![];
233
234 let mut log_interval = 0;
235 let mut log_interval_start_block = 0;
236
237 let mut parent_header = self.parent_header.clone();
238
239 async move {
240 while let Some(block_res) = stream.next().await {
241 let block = match block_res {
242 Ok(block) => block,
243 Err(FileClientError::Rlp(err, bytes)) => {
244 trace!(target: "downloaders::file",
245 %err,
246 bytes_len=bytes.len(),
247 "partial block returned from decoding chunk"
248 );
249 remaining_bytes = bytes;
250 break
251 }
252 Err(err) => return Err(err),
253 };
254
255 let block = SealedBlock::seal_slow(block);
256
257 self.consensus.validate_header(block.sealed_header())?;
259 if let Some(parent) = &parent_header {
260 self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
261 parent_header = Some(block.sealed_header().clone());
262 }
263
264 self.consensus.validate_block_pre_execution(&block)?;
266
267 let block_hash = block.hash();
269 let block_number = block.number();
270 let (header, body) = block.split_sealed_header_body();
271 headers.insert(block_number, header.unseal());
272 hash_to_number.insert(block_hash, block_number);
273 bodies.insert(block_hash, body);
274
275 if log_interval == 0 {
276 trace!(target: "downloaders::file",
277 block_number,
278 "read first block"
279 );
280 log_interval_start_block = block_number;
281 } else if log_interval % 100_000 == 0 {
282 trace!(target: "downloaders::file",
283 blocks=?log_interval_start_block..=block_number,
284 "read blocks from file"
285 );
286 log_interval_start_block = block_number + 1;
287 }
288 log_interval += 1;
289 }
290
291 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
292
293 Ok(DecodedFileChunk {
294 file_client: FileClient { headers, hash_to_number, bodies },
295 remaining_bytes,
296 highest_block: None,
297 })
298 }
299 }
300}
301
302impl<B: FullBlock> HeadersClient for FileClient<B> {
303 type Header = B::Header;
304 type Output = HeadersFut<B::Header>;
305
306 fn get_headers_with_priority(
307 &self,
308 request: HeadersRequest,
309 _priority: Priority,
310 ) -> Self::Output {
311 let mut headers = Vec::new();
313 trace!(target: "downloaders::file", request=?request, "Getting headers");
314
315 let start_num = match request.start {
316 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
317 Some(num) => *num,
318 None => {
319 warn!(%hash, "Could not find starting block number for requested header hash");
320 return Box::pin(async move { Err(RequestError::BadResponse) })
321 }
322 },
323 BlockHashOrNumber::Number(num) => num,
324 };
325
326 let range = if request.limit == 1 {
327 Either::Left(start_num..start_num + 1)
328 } else {
329 match request.direction {
330 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
331 HeadersDirection::Falling => {
332 Either::Right((start_num - request.limit + 1..=start_num).rev())
333 }
334 }
335 };
336
337 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
338
339 for block_number in range {
340 match self.headers.get(&block_number).cloned() {
341 Some(header) => headers.push(header),
342 None => {
343 warn!(number=%block_number, "Could not find header");
344 return Box::pin(async move { Err(RequestError::BadResponse) })
345 }
346 }
347 }
348
349 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
350 }
351}
352
353impl<B: FullBlock> BodiesClient for FileClient<B> {
354 type Body = B::Body;
355 type Output = BodiesFut<B::Body>;
356
357 fn get_block_bodies_with_priority_and_range_hint(
358 &self,
359 hashes: Vec<B256>,
360 _priority: Priority,
361 _range_hint: Option<RangeInclusive<u64>>,
362 ) -> Self::Output {
363 let mut bodies = Vec::new();
365
366 for hash in hashes {
369 match self.bodies.get(&hash).cloned() {
370 Some(body) => bodies.push(body),
371 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
372 }
373 }
374
375 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
376 }
377}
378
379impl<B: FullBlock> DownloadClient for FileClient<B> {
380 fn report_bad_message(&self, _peer_id: PeerId) {
381 trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
382 }
384
385 fn num_connected_peers(&self) -> usize {
386 1
388 }
389}
390
391impl<B: FullBlock> BlockClient for FileClient<B> {
392 type Block = B;
393}
394
395#[derive(Debug)]
397pub struct ChunkedFileReader {
398 file: File,
400 file_byte_len: u64,
402 chunk: Vec<u8>,
404 chunk_byte_len: u64,
406 highest_block: Option<u64>,
409}
410
411impl ChunkedFileReader {
412 pub const fn file_len(&self) -> u64 {
414 self.file_byte_len
415 }
416
417 pub async fn new<P: AsRef<Path>>(
420 path: P,
421 chunk_byte_len: Option<u64>,
422 ) -> Result<Self, FileClientError> {
423 let file = File::open(path).await?;
424 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
425
426 Self::from_file(file, chunk_byte_len).await
427 }
428
429 pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
431 let metadata = file.metadata().await?;
433 let file_byte_len = metadata.len();
434
435 Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
436 }
437
438 const fn chunk_len(&self) -> u64 {
441 let Self { chunk_byte_len, file_byte_len, .. } = *self;
442 let file_byte_len = file_byte_len + self.chunk.len() as u64;
443
444 if chunk_byte_len > file_byte_len {
445 file_byte_len
447 } else {
448 chunk_byte_len
449 }
450 }
451
452 async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
455 if self.file_byte_len == 0 && self.chunk.is_empty() {
456 return Ok(None)
458 }
459
460 let chunk_target_len = self.chunk_len();
461 let old_bytes_len = self.chunk.len() as u64;
462
463 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
465
466 let prev_read_bytes_len = self.chunk.len();
468 self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
469 let reader = &mut self.chunk[prev_read_bytes_len..];
470
471 let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
473 let next_chunk_byte_len = self.chunk.len();
474
475 self.file_byte_len -= new_read_bytes_len;
477
478 debug!(target: "downloaders::file",
479 max_chunk_byte_len=self.chunk_byte_len,
480 prev_read_bytes_len,
481 new_read_bytes_target_len,
482 new_read_bytes_len,
483 next_chunk_byte_len,
484 remaining_file_byte_len=self.file_byte_len,
485 "new bytes were read from file"
486 );
487
488 Ok(Some(next_chunk_byte_len as u64))
489 }
490
491 pub async fn next_chunk<B: FullBlock>(
493 &mut self,
494 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
495 parent_header: Option<SealedHeader<B::Header>>,
496 ) -> Result<Option<FileClient<B>>, FileClientError> {
497 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
498
499 let DecodedFileChunk { file_client, remaining_bytes, .. } =
501 FileClientBuilder { consensus, parent_header }
502 .build(&self.chunk[..], next_chunk_byte_len)
503 .await?;
504
505 self.chunk = remaining_bytes;
507
508 Ok(Some(file_client))
509 }
510
511 pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
513 where
514 T: FromReceiptReader,
515 {
516 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
517
518 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
520 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
521 .await?;
522
523 self.chunk = remaining_bytes;
525 self.highest_block = highest_block;
527
528 Ok(Some(file_client))
529 }
530}
531
532pub trait FromReader {
534 type Error: From<io::Error>;
536
537 type Output;
539
540 fn build<R>(
542 &self,
543 reader: R,
544 num_bytes: u64,
545 ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
546 where
547 Self: Sized,
548 R: AsyncReadExt + Unpin;
549}
550
551#[derive(Debug)]
553pub struct DecodedFileChunk<T> {
554 pub file_client: T,
556 pub remaining_bytes: Vec<u8>,
558 pub highest_block: Option<u64>,
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use crate::{
567 bodies::{
568 bodies::BodiesDownloaderBuilder,
569 test_utils::{insert_headers, zip_blocks},
570 },
571 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
572 test_utils::{generate_bodies, generate_bodies_file},
573 };
574 use assert_matches::assert_matches;
575 use futures_util::stream::StreamExt;
576 use rand::Rng;
577 use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
578 use reth_ethereum_primitives::Block;
579 use reth_network_p2p::{
580 bodies::downloader::BodyDownloader,
581 headers::downloader::{HeaderDownloader, SyncTarget},
582 };
583 use reth_provider::test_utils::create_test_provider_factory;
584 use std::sync::Arc;
585
586 #[tokio::test]
587 async fn streams_bodies_from_buffer() {
588 let factory = create_test_provider_factory();
590 let (headers, mut bodies) = generate_bodies(0..=19);
591
592 insert_headers(factory.db_ref().db(), &headers);
593
594 let file = tempfile::tempfile().unwrap();
596
597 let client: Arc<FileClient<Block>> = Arc::new(
598 FileClient::from_file(file.into(), NoopConsensus::arc())
599 .await
600 .unwrap()
601 .with_bodies(bodies.clone()),
602 );
603 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
604 client.clone(),
605 Arc::new(TestConsensus::default()),
606 factory,
607 );
608 downloader.set_download_range(0..=19).expect("failed to set download range");
609
610 assert_matches!(
611 downloader.next().await,
612 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
613 );
614 }
615
616 #[tokio::test]
617 async fn download_headers_at_fork_head() {
618 reth_tracing::init_test_tracing();
619
620 let p3 = SealedHeader::default();
621 let p2 = child_header(&p3);
622 let p1 = child_header(&p2);
623 let p0 = child_header(&p1);
624
625 let file = tempfile::tempfile().unwrap();
626 let client: Arc<FileClient<Block>> = Arc::new(
627 FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
628 HashMap::from([
629 (0u64, p0.clone_header()),
630 (1, p1.clone_header()),
631 (2, p2.clone_header()),
632 (3, p3.clone_header()),
633 ]),
634 ),
635 );
636
637 let mut downloader = ReverseHeadersDownloaderBuilder::default()
638 .stream_batch_size(3)
639 .request_limit(3)
640 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
641 downloader.update_local_head(p3.clone());
642 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
643
644 let headers = downloader.next().await.unwrap();
645 assert_eq!(headers, Ok(vec![p0, p1, p2]));
646 assert!(downloader.next().await.is_none());
647 assert!(downloader.next().await.is_none());
648 }
649
650 #[tokio::test]
651 async fn test_download_headers_from_file() {
652 reth_tracing::init_test_tracing();
653
654 let (file, headers, _) = generate_bodies_file(0..=19).await;
656 let client: Arc<FileClient<Block>> =
658 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
659
660 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
662 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
663 header_downloader.update_local_head(headers.first().unwrap().clone());
664 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
665
666 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
668
669 downloaded_headers.reverse();
671
672 assert_eq!(downloaded_headers, headers[1..]);
674 }
675
676 #[tokio::test]
677 async fn test_download_bodies_from_file() {
678 let factory = create_test_provider_factory();
680 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
681
682 let client: Arc<FileClient<Block>> =
684 Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
685
686 insert_headers(factory.db_ref().db(), &headers);
688
689 let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
690 client.clone(),
691 Arc::new(TestConsensus::default()),
692 factory,
693 );
694 downloader.set_download_range(0..=19).expect("failed to set download range");
695
696 assert_matches!(
697 downloader.next().await,
698 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
699 );
700 }
701
702 #[tokio::test]
703 async fn test_chunk_download_headers_from_file() {
704 reth_tracing::init_test_tracing();
705
706 let (file, headers, _) = generate_bodies_file(0..=14).await;
708
709 let chunk_byte_len = rand::rng().random_range(2000..=10_000);
712 trace!(target: "downloaders::file::test", chunk_byte_len);
713
714 let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
716
717 let mut downloaded_headers: Vec<SealedHeader> = vec![];
718
719 let mut local_header = headers.first().unwrap().clone();
720
721 while let Some(client) =
723 reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
724 {
725 let sync_target = client.tip_header().unwrap();
726
727 let sync_target_hash = sync_target.hash();
728
729 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
731 .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
732 header_downloader.update_local_head(local_header.clone());
733 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
734
735 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
737
738 local_header = sync_target;
740
741 downloaded_headers_chunk.reverse();
743 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
744 }
745
746 assert_eq!(headers[1..], downloaded_headers);
748 }
749}