1use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, Sealable, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14 error::{DownloadError, DownloadResult, PeerRequestResult},
15 headers::{
16 client::{HeadersClient, HeadersRequest},
17 downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18 error::{HeadersDownloaderError, HeadersDownloaderResult},
19 },
20 priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives_traits::{GotExpected, SealedHeader};
24use reth_tasks::Runtime;
25use std::{
26 cmp::{Ordering, Reverse},
27 collections::{binary_heap::PeekMut, BinaryHeap},
28 future::Future,
29 pin::Pin,
30 sync::Arc,
31 task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{debug, error, trace};
35
36const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H: Sealable> {
44 #[error(transparent)]
45 Downloader(#[from] HeadersDownloaderError<H>),
46 #[error(transparent)]
47 Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H: Sealable> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51 fn from(value: HeadersResponseError) -> Self {
52 Self::Response(Box::new(value))
53 }
54}
55
56#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70 consensus: Arc<dyn HeaderValidator<H::Header>>,
72 client: Arc<H>,
74 local_head: Option<SealedHeader<H::Header>>,
76 sync_target: Option<SyncTargetBlock>,
78 next_request_block_number: u64,
80 lowest_validated_header: Option<SealedHeader<H::Header>>,
82 next_chain_tip_block_number: u64,
84 request_limit: u64,
86 min_concurrent_requests: usize,
88 max_concurrent_requests: usize,
90 stream_batch_size: usize,
92 max_buffered_responses: usize,
94 sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99 in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101 buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103 queued_validated_headers: Vec<SealedHeader<H::Header>>,
107 metrics: HeaderDownloaderMetrics,
109}
110
111impl<H> ReverseHeadersDownloader<H>
114where
115 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117 pub fn builder() -> ReverseHeadersDownloaderBuilder {
119 ReverseHeadersDownloaderBuilder::default()
120 }
121
122 #[inline]
124 fn local_block_number(&self) -> Option<BlockNumber> {
125 self.local_head.as_ref().map(|h| h.number())
126 }
127
128 #[inline]
134 fn existing_local_block_number(&self) -> BlockNumber {
135 self.local_head.as_ref().expect("is initialized").number()
136 }
137
138 #[inline]
144 fn existing_sync_target(&self) -> SyncTargetBlock {
145 self.sync_target.as_ref().expect("is initialized").clone()
146 }
147
148 #[inline]
153 fn concurrent_request_limit(&self) -> usize {
154 let num_peers = self.client.num_connected_peers();
155
156 let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159 let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161 if num_peers < self.min_concurrent_requests {
163 return max_dynamic
164 }
165
166 max_dynamic.min(self.max_concurrent_requests)
167 }
168
169 fn next_request(&mut self) -> Option<HeadersRequest> {
175 if let Some(local_head) = self.local_block_number() &&
176 self.next_request_block_number > local_head
177 {
178 let request =
179 calc_next_request(local_head, self.next_request_block_number, self.request_limit);
180 self.next_request_block_number -= request.limit;
183
184 return Some(request)
185 }
186
187 None
188 }
189
190 fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
200 self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
201 }
202
203 fn reset(&mut self) {
207 debug!(target: "downloaders::headers", "Resetting headers downloader");
208 self.next_request_block_number = 0;
209 self.next_chain_tip_block_number = 0;
210 self.sync_target.take();
211 }
212
213 fn validate_sync_target(
215 &self,
216 header: &SealedHeader<H::Header>,
217 request: HeadersRequest,
218 peer_id: PeerId,
219 ) -> Result<(), Box<HeadersResponseError>> {
220 match self.existing_sync_target() {
221 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
222 if header.hash() != hash =>
223 {
224 Err(Box::new(HeadersResponseError {
225 request,
226 peer_id: Some(peer_id),
227 error: DownloadError::InvalidTip(
228 GotExpected { got: header.hash(), expected: hash }.into(),
229 ),
230 }))
231 }
232 SyncTargetBlock::Number(number) if header.number() != number => {
233 Err(Box::new(HeadersResponseError {
234 request,
235 peer_id: Some(peer_id),
236 error: DownloadError::InvalidTipNumber(GotExpected {
237 got: header.number(),
238 expected: number,
239 }),
240 }))
241 }
242 _ => Ok(()),
243 }
244 }
245
246 fn process_next_headers(
254 &mut self,
255 request: HeadersRequest,
256 headers: Vec<H::Header>,
257 peer_id: PeerId,
258 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
259 let mut validated = Vec::with_capacity(headers.len());
260
261 let sealed_headers =
262 headers.into_par_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>();
263 for parent in sealed_headers {
264 if let Some(validated_header) =
266 validated.last().or_else(|| self.lowest_validated_header())
267 {
268 if let Err(error) = self.validate(validated_header, &parent) {
269 trace!(target: "downloaders::headers", %error ,"Failed to validate header");
270 return Err(
271 HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
272 )
273 }
274 } else {
275 self.validate_sync_target(&parent, request.clone(), peer_id)?;
276 }
277
278 validated.push(parent);
279 }
280
281 if let Some((last_header, head)) = validated
283 .last_mut()
284 .zip(self.local_head.as_ref())
285 .filter(|(last, head)| last.number() == head.number() + 1)
286 {
287 if let Err(error) = self.consensus.validate_header(&*last_header) {
289 trace!(target: "downloaders::headers", %error, "Failed to validate header");
290 return Err(HeadersResponseError {
291 request,
292 peer_id: Some(peer_id),
293 error: DownloadError::HeaderValidation {
294 hash: head.hash(),
295 number: head.number(),
296 error: Box::new(error),
297 },
298 }
299 .into())
300 }
301
302 if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
309 let local_head = head.clone();
310 error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
312
313 self.reset();
318
319 return Err(HeadersDownloaderError::DetachedHead {
320 local_head: Box::new(local_head),
321 header: Box::new(last_header.clone()),
322 error: Box::new(error),
323 }
324 .into())
325 }
326 }
327
328 self.next_chain_tip_block_number =
330 validated.last().expect("exists").number().saturating_sub(1);
331 self.queued_validated_headers.extend(validated);
332
333 Ok(())
334 }
335
336 fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
347 if let Some(old_target) =
349 self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
350 {
351 if target_block_number > old_target {
352 self.next_request_block_number = next_block;
355 self.next_chain_tip_block_number = next_block;
356 self.clear();
357 } else {
358 let skip = self
360 .queued_validated_headers
361 .iter()
362 .take_while(|last| last.number() > target_block_number)
363 .count();
364 self.queued_validated_headers.drain(..skip);
366 }
367 } else {
368 self.next_request_block_number = next_block;
370 self.next_chain_tip_block_number = next_block;
371 }
372 }
373
374 fn on_sync_target_outcome(
376 &mut self,
377 response: HeadersRequestOutcome<H::Header>,
378 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
379 let sync_target = self.existing_sync_target();
380 let HeadersRequestOutcome { request, outcome } = response;
381 match outcome {
382 Ok(res) => {
383 let (peer_id, mut headers) = res.split();
384
385 self.metrics.total_downloaded.increment(headers.len() as u64);
387
388 headers.sort_unstable_by_key(|h| Reverse(h.number()));
390
391 if headers.is_empty() {
392 return Err(HeadersResponseError {
393 request,
394 peer_id: Some(peer_id),
395 error: DownloadError::EmptyResponse,
396 }
397 .into())
398 }
399
400 let header = headers.swap_remove(0);
401 let target = SealedHeader::seal_slow(header);
402
403 match sync_target {
404 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
405 if target.hash() != hash {
406 return Err(HeadersResponseError {
407 request,
408 peer_id: Some(peer_id),
409 error: DownloadError::InvalidTip(
410 GotExpected { got: target.hash(), expected: hash }.into(),
411 ),
412 }
413 .into())
414 }
415 }
416 SyncTargetBlock::Number(number) => {
417 if target.number() != number {
418 return Err(HeadersResponseError {
419 request,
420 peer_id: Some(peer_id),
421 error: DownloadError::InvalidTipNumber(GotExpected {
422 got: target.number(),
423 expected: number,
424 }),
425 }
426 .into())
427 }
428 }
429 }
430
431 trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
432
433 let parent_block_number = target.number().saturating_sub(1);
435 self.on_block_number_update(target.number(), parent_block_number);
436
437 self.queued_validated_headers.push(target);
438
439 self.try_validate_buffered()
441 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
442 .transpose()?;
443
444 Ok(())
445 }
446 Err(err) => {
447 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
448 }
449 }
450 }
451
452 fn on_headers_outcome(
454 &mut self,
455 response: HeadersRequestOutcome<H::Header>,
456 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
457 let requested_block_number = response.block_number();
458 let HeadersRequestOutcome { request, outcome } = response;
459
460 match outcome {
461 Ok(res) => {
462 let (peer_id, mut headers) = res.split();
463
464 self.metrics.total_downloaded.increment(headers.len() as u64);
466
467 trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
468
469 if headers.is_empty() {
470 return Err(HeadersResponseError {
471 request,
472 peer_id: Some(peer_id),
473 error: DownloadError::EmptyResponse,
474 }
475 .into())
476 }
477
478 if (headers.len() as u64) != request.limit {
479 return Err(HeadersResponseError {
480 peer_id: Some(peer_id),
481 error: DownloadError::HeadersResponseTooShort(GotExpected {
482 got: headers.len() as u64,
483 expected: request.limit,
484 }),
485 request,
486 }
487 .into())
488 }
489
490 headers.sort_unstable_by_key(|h| Reverse(h.number()));
492
493 let highest = &headers[0];
495
496 trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
497
498 if highest.number() != requested_block_number {
499 return Err(HeadersResponseError {
500 request,
501 peer_id: Some(peer_id),
502 error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
503 got: highest.number(),
504 expected: requested_block_number,
505 }),
506 }
507 .into())
508 }
509
510 if highest.number() == self.next_chain_tip_block_number {
512 self.process_next_headers(request, headers, peer_id)?;
514 self.try_validate_buffered()
516 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
517 .transpose()?;
518 } else if highest.number() > self.existing_local_block_number() {
519 self.metrics.buffered_responses.increment(1.);
520 self.buffered_responses.push(OrderedHeadersResponse {
522 headers,
523 request,
524 peer_id,
525 })
526 }
527
528 Ok(())
529 }
530 Err(err) => {
533 trace!(target: "downloaders::headers", %err, "Response error");
534 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
535 }
536 }
537 }
538
539 fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
540 if let Some(peer_id) = peer_id {
542 trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
543 self.client.report_bad_message(peer_id);
544 }
545 }
546
547 fn on_headers_error(&self, err: Box<HeadersResponseError>) {
551 let HeadersResponseError { request, peer_id, error } = *err;
552
553 self.penalize_peer(peer_id, &error);
554
555 self.metrics.increment_errors(&error);
557
558 self.submit_request(request, Priority::High);
560 }
561
562 fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
566 loop {
567 let next_response = self.buffered_responses.peek_mut()?;
569 let next_block_number = next_response.block_number();
570 match next_block_number.cmp(&self.next_chain_tip_block_number) {
571 Ordering::Less => return None,
572 Ordering::Equal => {
573 let OrderedHeadersResponse { headers, request, peer_id } =
574 PeekMut::pop(next_response);
575 self.metrics.buffered_responses.decrement(1.);
576
577 if let Err(err) = self.process_next_headers(request, headers, peer_id) {
578 return Some(err)
579 }
580 }
581 Ordering::Greater => {
582 self.metrics.buffered_responses.decrement(1.);
583 PeekMut::pop(next_response);
584 }
585 }
586 }
587 }
588
589 const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
591 HeadersRequest::falling(start, 1)
592 }
593
594 fn submit_request(&self, request: HeadersRequest, priority: Priority) {
596 trace!(target: "downloaders::headers", ?request, "Submitting headers request");
597 self.in_progress_queue.push(self.request_fut(request, priority));
598 self.metrics.in_flight_requests.increment(1.);
599 }
600
601 fn request_fut(
602 &self,
603 request: HeadersRequest,
604 priority: Priority,
605 ) -> HeadersRequestFuture<H::Output> {
606 let client = Arc::clone(&self.client);
607 HeadersRequestFuture {
608 request: Some(request.clone()),
609 fut: client.get_headers_with_priority(request, priority),
610 }
611 }
612
613 fn validate(
615 &self,
616 header: &SealedHeader<H::Header>,
617 parent: &SealedHeader<H::Header>,
618 ) -> DownloadResult<()> {
619 validate_header_download(&self.consensus, header, parent)
620 }
621
622 fn clear(&mut self) {
624 self.lowest_validated_header.take();
625 self.queued_validated_headers = Vec::new();
626 self.buffered_responses = BinaryHeap::new();
627 self.in_progress_queue.clear();
628
629 self.metrics.in_flight_requests.set(0.);
630 self.metrics.buffered_responses.set(0.);
631 }
632
633 fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
635 let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
636 let mut rem = self.queued_validated_headers.split_off(batch_size);
637 std::mem::swap(&mut rem, &mut self.queued_validated_headers);
638 rem.shrink_to_fit();
654 rem
655 }
656}
657
658impl<H> ReverseHeadersDownloader<H>
659where
660 H: HeadersClient,
661 Self: HeaderDownloader + 'static,
662{
663 pub fn into_task_with(
665 self,
666 runtime: &Runtime,
667 ) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
668 TaskDownloader::spawn_with(self, runtime)
669 }
670}
671
672impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
673where
674 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
675{
676 type Header = H::Header;
677
678 fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
679 while self
681 .queued_validated_headers
682 .last()
683 .is_some_and(|last| last.number() <= head.number())
684 {
685 self.queued_validated_headers.pop();
687 }
688 trace!(
689 target: "downloaders::headers",
690 head=?head.num_hash(),
691 "Updating local head"
692 );
693 self.local_head = Some(head);
695 }
696
697 fn update_sync_target(&mut self, target: SyncTarget) {
699 let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
700 trace!(
701 target: "downloaders::headers",
702 sync_target=?target,
703 current_tip=?current_tip,
704 "Updating sync target"
705 );
706 match target {
707 SyncTarget::Tip(tip) => {
708 if Some(tip) != current_tip {
709 trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
710 let new_sync_target = SyncTargetBlock::from_hash(tip);
711
712 if let Some(target_number) = self
715 .queued_validated_headers
716 .first()
717 .filter(|h| h.hash() == tip)
718 .map(|h| h.number())
719 {
720 self.sync_target = Some(new_sync_target.with_number(target_number));
721 return
722 }
723
724 trace!(target: "downloaders::headers", new=?target, "Request new sync target");
725 self.metrics.out_of_order_requests.increment(1);
726 self.sync_target = Some(new_sync_target);
727 self.sync_target_request = Some(
728 self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
729 );
730 }
731 }
732 SyncTarget::Gap(existing) => {
733 let target = existing.parent;
734 if Some(target) != current_tip {
735 self.sync_target_request.take();
737 let parent_block_number = existing.block.number.saturating_sub(1);
740
741 trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
742
743 self.sync_target = match self.sync_target.take() {
745 Some(sync_target) => Some(sync_target.with_hash(target)),
746 None => Some(SyncTargetBlock::from_hash(target)),
747 };
748 self.on_block_number_update(parent_block_number, parent_block_number);
749 }
750 }
751 SyncTarget::TipNum(num) => {
752 let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
753 if Some(num) != current_tip_num {
754 trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
755 self.sync_target = Some(SyncTargetBlock::from_number(num));
757 self.sync_target_request = Some(
758 self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
759 );
760 }
761 }
762 }
763 }
764
765 fn set_batch_size(&mut self, batch_size: usize) {
766 self.stream_batch_size = batch_size;
767 }
768}
769
770impl<H> Stream for ReverseHeadersDownloader<H>
771where
772 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
773{
774 type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
775
776 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
777 let this = self.get_mut();
778
779 if this.local_head.is_none() || this.sync_target.is_none() {
782 trace!(
783 target: "downloaders::headers",
784 head=?this.local_block_number(),
785 sync_target=?this.sync_target,
786 "The downloader sync boundaries have not been set"
787 );
788 return Poll::Pending
789 }
790
791 while let Some(mut req) = this.sync_target_request.take() {
794 match req.poll_unpin(cx) {
795 Poll::Ready(outcome) => {
796 match this.on_sync_target_outcome(outcome) {
797 Ok(()) => break,
798 Err(ReverseHeadersDownloaderError::Response(error)) => {
799 trace!(target: "downloaders::headers", %error, "invalid sync target response");
800 if error.is_channel_closed() {
801 return Poll::Ready(None)
803 }
804
805 this.penalize_peer(error.peer_id, &error.error);
806 this.metrics.increment_errors(&error.error);
807 this.sync_target_request =
808 Some(this.request_fut(error.request, Priority::High));
809 }
810 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
811 this.clear();
812 return Poll::Ready(Some(Err(error)))
813 }
814 };
815 }
816 Poll::Pending => {
817 this.sync_target_request = Some(req);
818 return Poll::Pending
819 }
820 }
821 }
822
823 this.buffered_responses.shrink_to_fit();
825
826 loop {
835 while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
837 this.metrics.in_flight_requests.decrement(1.);
838 match this.on_headers_outcome(outcome) {
840 Ok(()) => (),
841 Err(ReverseHeadersDownloaderError::Response(error)) => {
842 if error.is_channel_closed() {
843 return Poll::Ready(None)
845 }
846 this.on_headers_error(error);
847 }
848 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
849 this.clear();
850 return Poll::Ready(Some(Err(error)))
851 }
852 };
853 }
854
855 this.buffered_responses.shrink_to_fit();
857
858 let mut progress = false;
860
861 let concurrent_request_limit = this.concurrent_request_limit();
862 while this.in_progress_queue.len() < concurrent_request_limit &&
864 this.buffered_responses.len() < this.max_buffered_responses
865 {
866 if let Some(request) = this.next_request() {
867 trace!(
868 target: "downloaders::headers",
869 "Requesting headers {request:?}"
870 );
871 progress = true;
872 this.submit_request(request, Priority::Normal);
873 } else {
874 break
876 }
877 }
878
879 if this.queued_validated_headers.len() >= this.stream_batch_size {
881 let next_batch = this.split_next_batch();
882
883 if this.queued_validated_headers.is_empty() {
886 this.lowest_validated_header = next_batch.last().cloned();
887 }
888
889 trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
890
891 this.metrics.total_flushed.increment(next_batch.len() as u64);
892 return Poll::Ready(Some(Ok(next_batch)))
893 }
894
895 if !progress {
896 break
897 }
898 }
899
900 if this.in_progress_queue.is_empty() {
902 let next_batch = this.split_next_batch();
903 if next_batch.is_empty() {
904 this.clear();
905 return Poll::Ready(None)
906 }
907 this.metrics.total_flushed.increment(next_batch.len() as u64);
908 return Poll::Ready(Some(Ok(next_batch)))
909 }
910
911 Poll::Pending
912 }
913}
914
915#[derive(Debug)]
917struct HeadersRequestFuture<F> {
918 request: Option<HeadersRequest>,
919 fut: F,
920}
921
922impl<F, H> Future for HeadersRequestFuture<F>
923where
924 F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
925{
926 type Output = HeadersRequestOutcome<H>;
927
928 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
929 let this = self.get_mut();
930 let outcome = ready!(this.fut.poll_unpin(cx));
931 let request = this.request.take().unwrap();
932
933 Poll::Ready(HeadersRequestOutcome { request, outcome })
934 }
935}
936
937struct HeadersRequestOutcome<H> {
939 request: HeadersRequest,
940 outcome: PeerRequestResult<Vec<H>>,
941}
942
943impl<H> HeadersRequestOutcome<H> {
946 const fn block_number(&self) -> u64 {
947 self.request.start.as_number().expect("is number")
948 }
949}
950
951#[derive(Debug)]
953struct OrderedHeadersResponse<H> {
954 headers: Vec<H>,
955 request: HeadersRequest,
956 peer_id: PeerId,
957}
958
959impl<H> OrderedHeadersResponse<H> {
962 const fn block_number(&self) -> u64 {
963 self.request.start.as_number().expect("is number")
964 }
965}
966
967impl<H> PartialEq for OrderedHeadersResponse<H> {
968 fn eq(&self, other: &Self) -> bool {
969 self.block_number() == other.block_number()
970 }
971}
972
973impl<H> Eq for OrderedHeadersResponse<H> {}
974
975impl<H> PartialOrd for OrderedHeadersResponse<H> {
976 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
977 Some(self.cmp(other))
978 }
979}
980
981impl<H> Ord for OrderedHeadersResponse<H> {
982 fn cmp(&self, other: &Self) -> Ordering {
983 self.block_number().cmp(&other.block_number())
984 }
985}
986
987#[derive(Debug, Error)]
989#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
990struct HeadersResponseError {
991 request: HeadersRequest,
992 peer_id: Option<PeerId>,
993 #[source]
994 error: DownloadError,
995}
996
997impl HeadersResponseError {
998 const fn is_channel_closed(&self) -> bool {
1000 if let DownloadError::RequestError(ref err) = self.error {
1001 return err.is_channel_closed()
1002 }
1003 false
1004 }
1005}
1006
1007#[derive(Clone, Debug)]
1010pub enum SyncTargetBlock {
1011 Hash(B256),
1013 Number(u64),
1015 HashAndNumber {
1017 hash: B256,
1019 number: u64,
1021 },
1022}
1023
1024impl SyncTargetBlock {
1025 const fn from_hash(hash: B256) -> Self {
1027 Self::Hash(hash)
1028 }
1029
1030 const fn from_number(num: u64) -> Self {
1032 Self::Number(num)
1033 }
1034
1035 const fn with_hash(self, hash: B256) -> Self {
1037 match self {
1038 Self::Hash(_) => Self::Hash(hash),
1039 Self::Number(number) | Self::HashAndNumber { number, .. } => {
1040 Self::HashAndNumber { hash, number }
1041 }
1042 }
1043 }
1044
1045 const fn with_number(self, number: u64) -> Self {
1047 match self {
1048 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1049 Self::HashAndNumber { hash, number }
1050 }
1051 Self::Number(_) => Self::Number(number),
1052 }
1053 }
1054
1055 const fn replace_number(&mut self, number: u64) -> Option<u64> {
1060 match self {
1061 Self::Hash(hash) => {
1062 *self = Self::HashAndNumber { hash: *hash, number };
1063 None
1064 }
1065 Self::Number(old_number) => {
1066 let res = Some(*old_number);
1067 *self = Self::Number(number);
1068 res
1069 }
1070 Self::HashAndNumber { number: old_number, hash } => {
1071 let res = Some(*old_number);
1072 *self = Self::HashAndNumber { hash: *hash, number };
1073 res
1074 }
1075 }
1076 }
1077
1078 const fn hash(&self) -> Option<B256> {
1080 match self {
1081 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1082 Self::Number(_) => None,
1083 }
1084 }
1085
1086 const fn number(&self) -> Option<u64> {
1088 match self {
1089 Self::Hash(_) => None,
1090 Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1091 }
1092 }
1093}
1094
1095#[derive(Debug)]
1098pub struct ReverseHeadersDownloaderBuilder {
1099 request_limit: u64,
1101 stream_batch_size: usize,
1103 min_concurrent_requests: usize,
1105 max_concurrent_requests: usize,
1107 max_buffered_responses: usize,
1109}
1110
1111impl ReverseHeadersDownloaderBuilder {
1112 pub fn new(config: HeadersConfig) -> Self {
1115 Self::default()
1116 .request_limit(config.downloader_request_limit)
1117 .min_concurrent_requests(config.downloader_min_concurrent_requests)
1118 .max_concurrent_requests(config.downloader_max_concurrent_requests)
1119 .max_buffered_responses(config.downloader_max_buffered_responses)
1120 .stream_batch_size(config.commit_threshold as usize)
1121 }
1122}
1123
1124impl Default for ReverseHeadersDownloaderBuilder {
1125 fn default() -> Self {
1126 Self {
1127 stream_batch_size: 10_000,
1128 request_limit: 1_000,
1131 max_concurrent_requests: 100,
1132 min_concurrent_requests: 5,
1133 max_buffered_responses: 100,
1134 }
1135 }
1136}
1137
1138impl ReverseHeadersDownloaderBuilder {
1139 pub const fn request_limit(mut self, limit: u64) -> Self {
1144 self.request_limit = limit;
1145 self
1146 }
1147
1148 pub const fn stream_batch_size(mut self, size: usize) -> Self {
1154 self.stream_batch_size = size;
1155 self
1156 }
1157
1158 pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1163 self.min_concurrent_requests = min_concurrent_requests;
1164 self
1165 }
1166
1167 pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1171 self.max_concurrent_requests = max_concurrent_requests;
1172 self
1173 }
1174
1175 pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1182 self.max_buffered_responses = max_buffered_responses;
1183 self
1184 }
1185
1186 pub fn build<H>(
1189 self,
1190 client: H,
1191 consensus: Arc<dyn HeaderValidator<H::Header>>,
1192 ) -> ReverseHeadersDownloader<H>
1193 where
1194 H: HeadersClient + 'static,
1195 {
1196 let Self {
1197 request_limit,
1198 stream_batch_size,
1199 min_concurrent_requests,
1200 max_concurrent_requests,
1201 max_buffered_responses,
1202 } = self;
1203 ReverseHeadersDownloader {
1204 consensus,
1205 client: Arc::new(client),
1206 local_head: None,
1207 sync_target: None,
1208 next_request_block_number: 0,
1211 next_chain_tip_block_number: 0,
1212 lowest_validated_header: None,
1213 request_limit,
1214 min_concurrent_requests,
1215 max_concurrent_requests,
1216 stream_batch_size,
1217 max_buffered_responses,
1218 sync_target_request: None,
1219 in_progress_queue: Default::default(),
1220 buffered_responses: Default::default(),
1221 queued_validated_headers: Default::default(),
1222 metrics: Default::default(),
1223 }
1224 }
1225}
1226
1227#[inline]
1234fn calc_next_request(
1235 local_head: u64,
1236 next_request_block_number: u64,
1237 request_limit: u64,
1238) -> HeadersRequest {
1239 let diff = next_request_block_number - local_head;
1241 let limit = diff.min(request_limit);
1242 let start = next_request_block_number;
1243 HeadersRequest::falling(start.into(), limit)
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248 use super::*;
1249 use crate::headers::test_utils::child_header;
1250 use alloy_consensus::Header;
1251 use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1252 use assert_matches::assert_matches;
1253 use reth_consensus::test_utils::TestConsensus;
1254 use reth_network_p2p::test_utils::TestHeadersClient;
1255
1256 #[test]
1258 fn test_replace_number_semantics() {
1259 struct Fixture {
1260 sync_target_block: SyncTargetBlock,
1262 sync_target_option: Option<u64>,
1263
1264 replace_number: u64,
1266
1267 expected_result: Option<u64>,
1269
1270 new_number: u64,
1272 }
1273
1274 let fixtures = vec![
1275 Fixture {
1276 sync_target_block: SyncTargetBlock::Hash(B256::random()),
1277 sync_target_option: None,
1279 replace_number: 1,
1280 expected_result: None,
1281 new_number: 1,
1282 },
1283 Fixture {
1284 sync_target_block: SyncTargetBlock::Number(1),
1285 sync_target_option: Some(1),
1286 replace_number: 2,
1287 expected_result: Some(1),
1288 new_number: 2,
1289 },
1290 Fixture {
1291 sync_target_block: SyncTargetBlock::HashAndNumber {
1292 hash: B256::random(),
1293 number: 1,
1294 },
1295 sync_target_option: Some(1),
1296 replace_number: 2,
1297 expected_result: Some(1),
1298 new_number: 2,
1299 },
1300 ];
1301
1302 for fixture in fixtures {
1303 let mut sync_target_block = fixture.sync_target_block;
1304 let result = sync_target_block.replace_number(fixture.replace_number);
1305 assert_eq!(result, fixture.expected_result);
1306 assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1307
1308 let mut sync_target_option = fixture.sync_target_option;
1309 let option_result = sync_target_option.replace(fixture.replace_number);
1310 assert_eq!(option_result, fixture.expected_result);
1311 assert_eq!(sync_target_option, Some(fixture.new_number));
1312 }
1313 }
1314
1315 #[test]
1317 fn test_sync_target_update() {
1318 let client = Arc::new(TestHeadersClient::default());
1319
1320 let genesis = SealedHeader::default();
1321
1322 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1323 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1324 downloader.update_local_head(genesis);
1325 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1326
1327 downloader.sync_target_request.take();
1328
1329 let target = SyncTarget::Tip(B256::random());
1330 downloader.update_sync_target(target);
1331 assert!(downloader.sync_target_request.is_some());
1332
1333 downloader.sync_target_request.take();
1334 let target = SyncTarget::Gap(BlockWithParent {
1335 block: BlockNumHash::new(0, B256::random()),
1336 parent: Default::default(),
1337 });
1338 downloader.update_sync_target(target);
1339 assert!(downloader.sync_target_request.is_none());
1340 assert_matches!(
1341 downloader.sync_target,
1342 Some(target) => target.number().is_some()
1343 );
1344 }
1345
1346 #[test]
1348 fn test_head_update() {
1349 let client = Arc::new(TestHeadersClient::default());
1350
1351 let header: SealedHeader = SealedHeader::default();
1352
1353 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1354 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1355 downloader.update_local_head(header.clone());
1356 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1357
1358 downloader.queued_validated_headers.push(header.clone());
1359 let mut next = header.as_ref().clone();
1360 next.number += 1;
1361 downloader.update_local_head(SealedHeader::new(next, B256::random()));
1362 assert!(downloader.queued_validated_headers.is_empty());
1363 }
1364
1365 #[test]
1366 fn test_request_calc() {
1367 let local = 0;
1369 let next = 1000;
1370 let batch_size = 2;
1371 let request = calc_next_request(local, next, batch_size);
1372 assert_eq!(request.start, next.into());
1373 assert_eq!(request.limit, batch_size);
1374
1375 let local = 999;
1377 let next = 1000;
1378 let batch_size = 2;
1379 let request = calc_next_request(local, next, batch_size);
1380 assert_eq!(request.start, next.into());
1381 assert_eq!(request.limit, 1);
1382 }
1383
1384 #[test]
1386 fn test_next_request() {
1387 let client = Arc::new(TestHeadersClient::default());
1388
1389 let genesis = SealedHeader::default();
1390
1391 let batch_size = 99;
1392 let start = 1000;
1393 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1394 .request_limit(batch_size)
1395 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1396 downloader.update_local_head(genesis);
1397 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1398
1399 downloader.next_request_block_number = start;
1400
1401 let mut total = 0;
1402 while let Some(req) = downloader.next_request() {
1403 assert_eq!(req.start, (start - total).into());
1404 total += req.limit;
1405 }
1406 assert_eq!(total, start);
1407 assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1408 }
1409
1410 #[test]
1411 fn test_resp_order() {
1412 let mut heap = BinaryHeap::new();
1413 let hi = 1u64;
1414 heap.push(OrderedHeadersResponse::<Header> {
1415 headers: vec![],
1416 request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1417 peer_id: Default::default(),
1418 });
1419
1420 let lo = 0u64;
1421 heap.push(OrderedHeadersResponse {
1422 headers: vec![],
1423 request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1424 peer_id: Default::default(),
1425 });
1426
1427 assert_eq!(heap.pop().unwrap().block_number(), hi);
1428 assert_eq!(heap.pop().unwrap().block_number(), lo);
1429 }
1430
1431 #[tokio::test]
1432 async fn download_at_fork_head() {
1433 reth_tracing::init_test_tracing();
1434
1435 let client = Arc::new(TestHeadersClient::default());
1436
1437 let p3 = SealedHeader::default();
1438 let p2 = child_header(&p3);
1439 let p1 = child_header(&p2);
1440 let p0 = child_header(&p1);
1441
1442 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1443 .stream_batch_size(3)
1444 .request_limit(3)
1445 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1446 downloader.update_local_head(p3.clone());
1447 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1448
1449 client
1450 .extend(vec![
1451 p0.as_ref().clone(),
1452 p1.as_ref().clone(),
1453 p2.as_ref().clone(),
1454 p3.as_ref().clone(),
1455 ])
1456 .await;
1457
1458 let headers = downloader.next().await.unwrap();
1459 assert_eq!(headers.unwrap(), vec![p0, p1, p2,]);
1460 assert!(downloader.buffered_responses.is_empty());
1461 assert!(downloader.next().await.is_none());
1462 assert!(downloader.next().await.is_none());
1463 }
1464
1465 #[tokio::test]
1466 async fn download_one_by_one() {
1467 reth_tracing::init_test_tracing();
1468 let p3 = SealedHeader::default();
1469 let p2 = child_header(&p3);
1470 let p1 = child_header(&p2);
1471 let p0 = child_header(&p1);
1472
1473 let client = Arc::new(TestHeadersClient::default());
1474 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1475 .stream_batch_size(1)
1476 .request_limit(1)
1477 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1478 downloader.update_local_head(p3.clone());
1479 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1480
1481 client
1482 .extend(vec![
1483 p0.as_ref().clone(),
1484 p1.as_ref().clone(),
1485 p2.as_ref().clone(),
1486 p3.as_ref().clone(),
1487 ])
1488 .await;
1489
1490 let headers = downloader.next().await.unwrap();
1491 let headers = headers.unwrap();
1492 assert_eq!(headers, vec![p0]);
1493 assert_eq!(headers.capacity(), headers.len());
1494
1495 let headers = downloader.next().await.unwrap();
1496 let headers = headers.unwrap();
1497 assert_eq!(headers, vec![p1]);
1498 assert_eq!(headers.capacity(), headers.len());
1499
1500 let headers = downloader.next().await.unwrap();
1501 let headers = headers.unwrap();
1502 assert_eq!(headers, vec![p2]);
1503 assert_eq!(headers.capacity(), headers.len());
1504
1505 assert!(downloader.next().await.is_none());
1506 }
1507
1508 #[tokio::test]
1509 async fn download_one_by_one_larger_request_limit() {
1510 reth_tracing::init_test_tracing();
1511 let p3 = SealedHeader::default();
1512 let p2 = child_header(&p3);
1513 let p1 = child_header(&p2);
1514 let p0 = child_header(&p1);
1515
1516 let client = Arc::new(TestHeadersClient::default());
1517 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1518 .stream_batch_size(1)
1519 .request_limit(3)
1520 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1521 downloader.update_local_head(p3.clone());
1522 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1523
1524 client
1525 .extend(vec![
1526 p0.as_ref().clone(),
1527 p1.as_ref().clone(),
1528 p2.as_ref().clone(),
1529 p3.as_ref().clone(),
1530 ])
1531 .await;
1532
1533 let headers = downloader.next().await.unwrap();
1534 let headers = headers.unwrap();
1535 assert_eq!(headers, vec![p0]);
1536 assert_eq!(headers.capacity(), headers.len());
1537
1538 let headers = downloader.next().await.unwrap();
1539 let headers = headers.unwrap();
1540 assert_eq!(headers, vec![p1]);
1541 assert_eq!(headers.capacity(), headers.len());
1542
1543 let headers = downloader.next().await.unwrap();
1544 let headers = headers.unwrap();
1545 assert_eq!(headers, vec![p2]);
1546 assert_eq!(headers.capacity(), headers.len());
1547
1548 assert!(downloader.next().await.is_none());
1549 }
1550}