1use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, Sealable, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14 error::{DownloadError, DownloadResult, PeerRequestResult},
15 headers::{
16 client::{HeadersClient, HeadersRequest},
17 downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18 error::{HeadersDownloaderError, HeadersDownloaderResult},
19 },
20 priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives_traits::{GotExpected, SealedHeader};
24use reth_tasks::{TaskSpawner, TokioTaskExecutor};
25use std::{
26 cmp::{Ordering, Reverse},
27 collections::{binary_heap::PeekMut, BinaryHeap},
28 future::Future,
29 pin::Pin,
30 sync::Arc,
31 task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{debug, error, trace};
35
36const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H: Sealable> {
44 #[error(transparent)]
45 Downloader(#[from] HeadersDownloaderError<H>),
46 #[error(transparent)]
47 Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H: Sealable> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51 fn from(value: HeadersResponseError) -> Self {
52 Self::Response(Box::new(value))
53 }
54}
55
56#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70 consensus: Arc<dyn HeaderValidator<H::Header>>,
72 client: Arc<H>,
74 local_head: Option<SealedHeader<H::Header>>,
76 sync_target: Option<SyncTargetBlock>,
78 next_request_block_number: u64,
80 lowest_validated_header: Option<SealedHeader<H::Header>>,
82 next_chain_tip_block_number: u64,
84 request_limit: u64,
86 min_concurrent_requests: usize,
88 max_concurrent_requests: usize,
90 stream_batch_size: usize,
92 max_buffered_responses: usize,
94 sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99 in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101 buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103 queued_validated_headers: Vec<SealedHeader<H::Header>>,
107 metrics: HeaderDownloaderMetrics,
109}
110
111impl<H> ReverseHeadersDownloader<H>
114where
115 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117 pub fn builder() -> ReverseHeadersDownloaderBuilder {
119 ReverseHeadersDownloaderBuilder::default()
120 }
121
122 #[inline]
124 fn local_block_number(&self) -> Option<BlockNumber> {
125 self.local_head.as_ref().map(|h| h.number())
126 }
127
128 #[inline]
134 fn existing_local_block_number(&self) -> BlockNumber {
135 self.local_head.as_ref().expect("is initialized").number()
136 }
137
138 #[inline]
144 fn existing_sync_target(&self) -> SyncTargetBlock {
145 self.sync_target.as_ref().expect("is initialized").clone()
146 }
147
148 #[inline]
153 fn concurrent_request_limit(&self) -> usize {
154 let num_peers = self.client.num_connected_peers();
155
156 let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159 let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161 if num_peers < self.min_concurrent_requests {
163 return max_dynamic
164 }
165
166 max_dynamic.min(self.max_concurrent_requests)
167 }
168
169 fn next_request(&mut self) -> Option<HeadersRequest> {
175 if let Some(local_head) = self.local_block_number() &&
176 self.next_request_block_number > local_head
177 {
178 let request =
179 calc_next_request(local_head, self.next_request_block_number, self.request_limit);
180 self.next_request_block_number -= request.limit;
183
184 return Some(request)
185 }
186
187 None
188 }
189
190 fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
200 self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
201 }
202
203 fn reset(&mut self) {
207 debug!(target: "downloaders::headers", "Resetting headers downloader");
208 self.next_request_block_number = 0;
209 self.next_chain_tip_block_number = 0;
210 self.sync_target.take();
211 }
212
213 fn validate_sync_target(
215 &self,
216 header: &SealedHeader<H::Header>,
217 request: HeadersRequest,
218 peer_id: PeerId,
219 ) -> Result<(), Box<HeadersResponseError>> {
220 match self.existing_sync_target() {
221 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
222 if header.hash() != hash =>
223 {
224 Err(Box::new(HeadersResponseError {
225 request,
226 peer_id: Some(peer_id),
227 error: DownloadError::InvalidTip(
228 GotExpected { got: header.hash(), expected: hash }.into(),
229 ),
230 }))
231 }
232 SyncTargetBlock::Number(number) if header.number() != number => {
233 Err(Box::new(HeadersResponseError {
234 request,
235 peer_id: Some(peer_id),
236 error: DownloadError::InvalidTipNumber(GotExpected {
237 got: header.number(),
238 expected: number,
239 }),
240 }))
241 }
242 _ => Ok(()),
243 }
244 }
245
246 fn process_next_headers(
254 &mut self,
255 request: HeadersRequest,
256 headers: Vec<H::Header>,
257 peer_id: PeerId,
258 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
259 let mut validated = Vec::with_capacity(headers.len());
260
261 let sealed_headers =
262 headers.into_par_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>();
263 for parent in sealed_headers {
264 if let Some(validated_header) =
266 validated.last().or_else(|| self.lowest_validated_header())
267 {
268 if let Err(error) = self.validate(validated_header, &parent) {
269 trace!(target: "downloaders::headers", %error ,"Failed to validate header");
270 return Err(
271 HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
272 )
273 }
274 } else {
275 self.validate_sync_target(&parent, request.clone(), peer_id)?;
276 }
277
278 validated.push(parent);
279 }
280
281 if let Some((last_header, head)) = validated
283 .last_mut()
284 .zip(self.local_head.as_ref())
285 .filter(|(last, head)| last.number() == head.number() + 1)
286 {
287 if let Err(error) = self.consensus.validate_header(&*last_header) {
289 trace!(target: "downloaders::headers", %error, "Failed to validate header");
290 return Err(HeadersResponseError {
291 request,
292 peer_id: Some(peer_id),
293 error: DownloadError::HeaderValidation {
294 hash: head.hash(),
295 number: head.number(),
296 error: Box::new(error),
297 },
298 }
299 .into())
300 }
301
302 if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
309 let local_head = head.clone();
310 error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
312
313 self.reset();
318
319 return Err(HeadersDownloaderError::DetachedHead {
320 local_head: Box::new(local_head),
321 header: Box::new(last_header.clone()),
322 error: Box::new(error),
323 }
324 .into())
325 }
326 }
327
328 self.next_chain_tip_block_number =
330 validated.last().expect("exists").number().saturating_sub(1);
331 self.queued_validated_headers.extend(validated);
332
333 Ok(())
334 }
335
336 fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
347 if let Some(old_target) =
349 self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
350 {
351 if target_block_number > old_target {
352 self.next_request_block_number = next_block;
355 self.next_chain_tip_block_number = next_block;
356 self.clear();
357 } else {
358 let skip = self
360 .queued_validated_headers
361 .iter()
362 .take_while(|last| last.number() > target_block_number)
363 .count();
364 self.queued_validated_headers.drain(..skip);
366 }
367 } else {
368 self.next_request_block_number = next_block;
370 self.next_chain_tip_block_number = next_block;
371 }
372 }
373
374 fn on_sync_target_outcome(
376 &mut self,
377 response: HeadersRequestOutcome<H::Header>,
378 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
379 let sync_target = self.existing_sync_target();
380 let HeadersRequestOutcome { request, outcome } = response;
381 match outcome {
382 Ok(res) => {
383 let (peer_id, mut headers) = res.split();
384
385 self.metrics.total_downloaded.increment(headers.len() as u64);
387
388 headers.sort_unstable_by_key(|h| Reverse(h.number()));
390
391 if headers.is_empty() {
392 return Err(HeadersResponseError {
393 request,
394 peer_id: Some(peer_id),
395 error: DownloadError::EmptyResponse,
396 }
397 .into())
398 }
399
400 let header = headers.swap_remove(0);
401 let target = SealedHeader::seal_slow(header);
402
403 match sync_target {
404 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
405 if target.hash() != hash {
406 return Err(HeadersResponseError {
407 request,
408 peer_id: Some(peer_id),
409 error: DownloadError::InvalidTip(
410 GotExpected { got: target.hash(), expected: hash }.into(),
411 ),
412 }
413 .into())
414 }
415 }
416 SyncTargetBlock::Number(number) => {
417 if target.number() != number {
418 return Err(HeadersResponseError {
419 request,
420 peer_id: Some(peer_id),
421 error: DownloadError::InvalidTipNumber(GotExpected {
422 got: target.number(),
423 expected: number,
424 }),
425 }
426 .into())
427 }
428 }
429 }
430
431 trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
432
433 let parent_block_number = target.number().saturating_sub(1);
435 self.on_block_number_update(target.number(), parent_block_number);
436
437 self.queued_validated_headers.push(target);
438
439 self.try_validate_buffered()
441 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
442 .transpose()?;
443
444 Ok(())
445 }
446 Err(err) => {
447 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
448 }
449 }
450 }
451
452 fn on_headers_outcome(
454 &mut self,
455 response: HeadersRequestOutcome<H::Header>,
456 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
457 let requested_block_number = response.block_number();
458 let HeadersRequestOutcome { request, outcome } = response;
459
460 match outcome {
461 Ok(res) => {
462 let (peer_id, mut headers) = res.split();
463
464 self.metrics.total_downloaded.increment(headers.len() as u64);
466
467 trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
468
469 if headers.is_empty() {
470 return Err(HeadersResponseError {
471 request,
472 peer_id: Some(peer_id),
473 error: DownloadError::EmptyResponse,
474 }
475 .into())
476 }
477
478 if (headers.len() as u64) != request.limit {
479 return Err(HeadersResponseError {
480 peer_id: Some(peer_id),
481 error: DownloadError::HeadersResponseTooShort(GotExpected {
482 got: headers.len() as u64,
483 expected: request.limit,
484 }),
485 request,
486 }
487 .into())
488 }
489
490 headers.sort_unstable_by_key(|h| Reverse(h.number()));
492
493 let highest = &headers[0];
495
496 trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
497
498 if highest.number() != requested_block_number {
499 return Err(HeadersResponseError {
500 request,
501 peer_id: Some(peer_id),
502 error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
503 got: highest.number(),
504 expected: requested_block_number,
505 }),
506 }
507 .into())
508 }
509
510 if highest.number() == self.next_chain_tip_block_number {
512 self.process_next_headers(request, headers, peer_id)?;
514 self.try_validate_buffered()
516 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
517 .transpose()?;
518 } else if highest.number() > self.existing_local_block_number() {
519 self.metrics.buffered_responses.increment(1.);
520 self.buffered_responses.push(OrderedHeadersResponse {
522 headers,
523 request,
524 peer_id,
525 })
526 }
527
528 Ok(())
529 }
530 Err(err) => {
533 trace!(target: "downloaders::headers", %err, "Response error");
534 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
535 }
536 }
537 }
538
539 fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
540 if let Some(peer_id) = peer_id {
542 trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
543 self.client.report_bad_message(peer_id);
544 }
545 }
546
547 fn on_headers_error(&self, err: Box<HeadersResponseError>) {
551 let HeadersResponseError { request, peer_id, error } = *err;
552
553 self.penalize_peer(peer_id, &error);
554
555 self.metrics.increment_errors(&error);
557
558 self.submit_request(request, Priority::High);
560 }
561
562 fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
566 loop {
567 let next_response = self.buffered_responses.peek_mut()?;
569 let next_block_number = next_response.block_number();
570 match next_block_number.cmp(&self.next_chain_tip_block_number) {
571 Ordering::Less => return None,
572 Ordering::Equal => {
573 let OrderedHeadersResponse { headers, request, peer_id } =
574 PeekMut::pop(next_response);
575 self.metrics.buffered_responses.decrement(1.);
576
577 if let Err(err) = self.process_next_headers(request, headers, peer_id) {
578 return Some(err)
579 }
580 }
581 Ordering::Greater => {
582 self.metrics.buffered_responses.decrement(1.);
583 PeekMut::pop(next_response);
584 }
585 }
586 }
587 }
588
589 const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
591 HeadersRequest::falling(start, 1)
592 }
593
594 fn submit_request(&self, request: HeadersRequest, priority: Priority) {
596 trace!(target: "downloaders::headers", ?request, "Submitting headers request");
597 self.in_progress_queue.push(self.request_fut(request, priority));
598 self.metrics.in_flight_requests.increment(1.);
599 }
600
601 fn request_fut(
602 &self,
603 request: HeadersRequest,
604 priority: Priority,
605 ) -> HeadersRequestFuture<H::Output> {
606 let client = Arc::clone(&self.client);
607 HeadersRequestFuture {
608 request: Some(request.clone()),
609 fut: client.get_headers_with_priority(request, priority),
610 }
611 }
612
613 fn validate(
615 &self,
616 header: &SealedHeader<H::Header>,
617 parent: &SealedHeader<H::Header>,
618 ) -> DownloadResult<()> {
619 validate_header_download(&self.consensus, header, parent)
620 }
621
622 fn clear(&mut self) {
624 self.lowest_validated_header.take();
625 self.queued_validated_headers = Vec::new();
626 self.buffered_responses = BinaryHeap::new();
627 self.in_progress_queue.clear();
628
629 self.metrics.in_flight_requests.set(0.);
630 self.metrics.buffered_responses.set(0.);
631 }
632
633 fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
635 let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
636 let mut rem = self.queued_validated_headers.split_off(batch_size);
637 std::mem::swap(&mut rem, &mut self.queued_validated_headers);
638 rem.shrink_to_fit();
654 rem
655 }
656}
657
658impl<H> ReverseHeadersDownloader<H>
659where
660 H: HeadersClient,
661 Self: HeaderDownloader + 'static,
662{
663 pub fn into_task(self) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
665 self.into_task_with(&TokioTaskExecutor::default())
666 }
667
668 pub fn into_task_with<S>(
670 self,
671 spawner: &S,
672 ) -> TaskDownloader<<Self as HeaderDownloader>::Header>
673 where
674 S: TaskSpawner,
675 {
676 TaskDownloader::spawn_with(self, spawner)
677 }
678}
679
680impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
681where
682 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
683{
684 type Header = H::Header;
685
686 fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
687 while self
689 .queued_validated_headers
690 .last()
691 .is_some_and(|last| last.number() <= head.number())
692 {
693 self.queued_validated_headers.pop();
695 }
696 trace!(
697 target: "downloaders::headers",
698 head=?head.num_hash(),
699 "Updating local head"
700 );
701 self.local_head = Some(head);
703 }
704
705 fn update_sync_target(&mut self, target: SyncTarget) {
707 let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
708 trace!(
709 target: "downloaders::headers",
710 sync_target=?target,
711 current_tip=?current_tip,
712 "Updating sync target"
713 );
714 match target {
715 SyncTarget::Tip(tip) => {
716 if Some(tip) != current_tip {
717 trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
718 let new_sync_target = SyncTargetBlock::from_hash(tip);
719
720 if let Some(target_number) = self
723 .queued_validated_headers
724 .first()
725 .filter(|h| h.hash() == tip)
726 .map(|h| h.number())
727 {
728 self.sync_target = Some(new_sync_target.with_number(target_number));
729 return
730 }
731
732 trace!(target: "downloaders::headers", new=?target, "Request new sync target");
733 self.metrics.out_of_order_requests.increment(1);
734 self.sync_target = Some(new_sync_target);
735 self.sync_target_request = Some(
736 self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
737 );
738 }
739 }
740 SyncTarget::Gap(existing) => {
741 let target = existing.parent;
742 if Some(target) != current_tip {
743 self.sync_target_request.take();
745 let parent_block_number = existing.block.number.saturating_sub(1);
748
749 trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
750
751 self.sync_target = match self.sync_target.take() {
753 Some(sync_target) => Some(sync_target.with_hash(target)),
754 None => Some(SyncTargetBlock::from_hash(target)),
755 };
756 self.on_block_number_update(parent_block_number, parent_block_number);
757 }
758 }
759 SyncTarget::TipNum(num) => {
760 let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
761 if Some(num) != current_tip_num {
762 trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
763 self.sync_target = Some(SyncTargetBlock::from_number(num));
765 self.sync_target_request = Some(
766 self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
767 );
768 }
769 }
770 }
771 }
772
773 fn set_batch_size(&mut self, batch_size: usize) {
774 self.stream_batch_size = batch_size;
775 }
776}
777
778impl<H> Stream for ReverseHeadersDownloader<H>
779where
780 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
781{
782 type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
783
784 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
785 let this = self.get_mut();
786
787 if this.local_head.is_none() || this.sync_target.is_none() {
790 trace!(
791 target: "downloaders::headers",
792 head=?this.local_block_number(),
793 sync_target=?this.sync_target,
794 "The downloader sync boundaries have not been set"
795 );
796 return Poll::Pending
797 }
798
799 while let Some(mut req) = this.sync_target_request.take() {
802 match req.poll_unpin(cx) {
803 Poll::Ready(outcome) => {
804 match this.on_sync_target_outcome(outcome) {
805 Ok(()) => break,
806 Err(ReverseHeadersDownloaderError::Response(error)) => {
807 trace!(target: "downloaders::headers", %error, "invalid sync target response");
808 if error.is_channel_closed() {
809 return Poll::Ready(None)
811 }
812
813 this.penalize_peer(error.peer_id, &error.error);
814 this.metrics.increment_errors(&error.error);
815 this.sync_target_request =
816 Some(this.request_fut(error.request, Priority::High));
817 }
818 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
819 this.clear();
820 return Poll::Ready(Some(Err(error)))
821 }
822 };
823 }
824 Poll::Pending => {
825 this.sync_target_request = Some(req);
826 return Poll::Pending
827 }
828 }
829 }
830
831 this.buffered_responses.shrink_to_fit();
833
834 loop {
843 while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
845 this.metrics.in_flight_requests.decrement(1.);
846 match this.on_headers_outcome(outcome) {
848 Ok(()) => (),
849 Err(ReverseHeadersDownloaderError::Response(error)) => {
850 if error.is_channel_closed() {
851 return Poll::Ready(None)
853 }
854 this.on_headers_error(error);
855 }
856 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
857 this.clear();
858 return Poll::Ready(Some(Err(error)))
859 }
860 };
861 }
862
863 this.buffered_responses.shrink_to_fit();
865
866 let mut progress = false;
868
869 let concurrent_request_limit = this.concurrent_request_limit();
870 while this.in_progress_queue.len() < concurrent_request_limit &&
872 this.buffered_responses.len() < this.max_buffered_responses
873 {
874 if let Some(request) = this.next_request() {
875 trace!(
876 target: "downloaders::headers",
877 "Requesting headers {request:?}"
878 );
879 progress = true;
880 this.submit_request(request, Priority::Normal);
881 } else {
882 break
884 }
885 }
886
887 if this.queued_validated_headers.len() >= this.stream_batch_size {
889 let next_batch = this.split_next_batch();
890
891 if this.queued_validated_headers.is_empty() {
894 this.lowest_validated_header = next_batch.last().cloned();
895 }
896
897 trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
898
899 this.metrics.total_flushed.increment(next_batch.len() as u64);
900 return Poll::Ready(Some(Ok(next_batch)))
901 }
902
903 if !progress {
904 break
905 }
906 }
907
908 if this.in_progress_queue.is_empty() {
910 let next_batch = this.split_next_batch();
911 if next_batch.is_empty() {
912 this.clear();
913 return Poll::Ready(None)
914 }
915 this.metrics.total_flushed.increment(next_batch.len() as u64);
916 return Poll::Ready(Some(Ok(next_batch)))
917 }
918
919 Poll::Pending
920 }
921}
922
923#[derive(Debug)]
925struct HeadersRequestFuture<F> {
926 request: Option<HeadersRequest>,
927 fut: F,
928}
929
930impl<F, H> Future for HeadersRequestFuture<F>
931where
932 F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
933{
934 type Output = HeadersRequestOutcome<H>;
935
936 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
937 let this = self.get_mut();
938 let outcome = ready!(this.fut.poll_unpin(cx));
939 let request = this.request.take().unwrap();
940
941 Poll::Ready(HeadersRequestOutcome { request, outcome })
942 }
943}
944
945struct HeadersRequestOutcome<H> {
947 request: HeadersRequest,
948 outcome: PeerRequestResult<Vec<H>>,
949}
950
951impl<H> HeadersRequestOutcome<H> {
954 const fn block_number(&self) -> u64 {
955 self.request.start.as_number().expect("is number")
956 }
957}
958
959#[derive(Debug)]
961struct OrderedHeadersResponse<H> {
962 headers: Vec<H>,
963 request: HeadersRequest,
964 peer_id: PeerId,
965}
966
967impl<H> OrderedHeadersResponse<H> {
970 const fn block_number(&self) -> u64 {
971 self.request.start.as_number().expect("is number")
972 }
973}
974
975impl<H> PartialEq for OrderedHeadersResponse<H> {
976 fn eq(&self, other: &Self) -> bool {
977 self.block_number() == other.block_number()
978 }
979}
980
981impl<H> Eq for OrderedHeadersResponse<H> {}
982
983impl<H> PartialOrd for OrderedHeadersResponse<H> {
984 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
985 Some(self.cmp(other))
986 }
987}
988
989impl<H> Ord for OrderedHeadersResponse<H> {
990 fn cmp(&self, other: &Self) -> Ordering {
991 self.block_number().cmp(&other.block_number())
992 }
993}
994
995#[derive(Debug, Error)]
997#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
998struct HeadersResponseError {
999 request: HeadersRequest,
1000 peer_id: Option<PeerId>,
1001 #[source]
1002 error: DownloadError,
1003}
1004
1005impl HeadersResponseError {
1006 const fn is_channel_closed(&self) -> bool {
1008 if let DownloadError::RequestError(ref err) = self.error {
1009 return err.is_channel_closed()
1010 }
1011 false
1012 }
1013}
1014
1015#[derive(Clone, Debug)]
1018pub enum SyncTargetBlock {
1019 Hash(B256),
1021 Number(u64),
1023 HashAndNumber {
1025 hash: B256,
1027 number: u64,
1029 },
1030}
1031
1032impl SyncTargetBlock {
1033 const fn from_hash(hash: B256) -> Self {
1035 Self::Hash(hash)
1036 }
1037
1038 const fn from_number(num: u64) -> Self {
1040 Self::Number(num)
1041 }
1042
1043 const fn with_hash(self, hash: B256) -> Self {
1045 match self {
1046 Self::Hash(_) => Self::Hash(hash),
1047 Self::Number(number) | Self::HashAndNumber { number, .. } => {
1048 Self::HashAndNumber { hash, number }
1049 }
1050 }
1051 }
1052
1053 const fn with_number(self, number: u64) -> Self {
1055 match self {
1056 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1057 Self::HashAndNumber { hash, number }
1058 }
1059 Self::Number(_) => Self::Number(number),
1060 }
1061 }
1062
1063 const fn replace_number(&mut self, number: u64) -> Option<u64> {
1068 match self {
1069 Self::Hash(hash) => {
1070 *self = Self::HashAndNumber { hash: *hash, number };
1071 None
1072 }
1073 Self::Number(old_number) => {
1074 let res = Some(*old_number);
1075 *self = Self::Number(number);
1076 res
1077 }
1078 Self::HashAndNumber { number: old_number, hash } => {
1079 let res = Some(*old_number);
1080 *self = Self::HashAndNumber { hash: *hash, number };
1081 res
1082 }
1083 }
1084 }
1085
1086 const fn hash(&self) -> Option<B256> {
1088 match self {
1089 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1090 Self::Number(_) => None,
1091 }
1092 }
1093
1094 const fn number(&self) -> Option<u64> {
1096 match self {
1097 Self::Hash(_) => None,
1098 Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1099 }
1100 }
1101}
1102
1103#[derive(Debug)]
1106pub struct ReverseHeadersDownloaderBuilder {
1107 request_limit: u64,
1109 stream_batch_size: usize,
1111 min_concurrent_requests: usize,
1113 max_concurrent_requests: usize,
1115 max_buffered_responses: usize,
1117}
1118
1119impl ReverseHeadersDownloaderBuilder {
1120 pub fn new(config: HeadersConfig) -> Self {
1123 Self::default()
1124 .request_limit(config.downloader_request_limit)
1125 .min_concurrent_requests(config.downloader_min_concurrent_requests)
1126 .max_concurrent_requests(config.downloader_max_concurrent_requests)
1127 .max_buffered_responses(config.downloader_max_buffered_responses)
1128 .stream_batch_size(config.commit_threshold as usize)
1129 }
1130}
1131
1132impl Default for ReverseHeadersDownloaderBuilder {
1133 fn default() -> Self {
1134 Self {
1135 stream_batch_size: 10_000,
1136 request_limit: 1_000,
1139 max_concurrent_requests: 100,
1140 min_concurrent_requests: 5,
1141 max_buffered_responses: 100,
1142 }
1143 }
1144}
1145
1146impl ReverseHeadersDownloaderBuilder {
1147 pub const fn request_limit(mut self, limit: u64) -> Self {
1152 self.request_limit = limit;
1153 self
1154 }
1155
1156 pub const fn stream_batch_size(mut self, size: usize) -> Self {
1162 self.stream_batch_size = size;
1163 self
1164 }
1165
1166 pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1171 self.min_concurrent_requests = min_concurrent_requests;
1172 self
1173 }
1174
1175 pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1179 self.max_concurrent_requests = max_concurrent_requests;
1180 self
1181 }
1182
1183 pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1190 self.max_buffered_responses = max_buffered_responses;
1191 self
1192 }
1193
1194 pub fn build<H>(
1197 self,
1198 client: H,
1199 consensus: Arc<dyn HeaderValidator<H::Header>>,
1200 ) -> ReverseHeadersDownloader<H>
1201 where
1202 H: HeadersClient + 'static,
1203 {
1204 let Self {
1205 request_limit,
1206 stream_batch_size,
1207 min_concurrent_requests,
1208 max_concurrent_requests,
1209 max_buffered_responses,
1210 } = self;
1211 ReverseHeadersDownloader {
1212 consensus,
1213 client: Arc::new(client),
1214 local_head: None,
1215 sync_target: None,
1216 next_request_block_number: 0,
1219 next_chain_tip_block_number: 0,
1220 lowest_validated_header: None,
1221 request_limit,
1222 min_concurrent_requests,
1223 max_concurrent_requests,
1224 stream_batch_size,
1225 max_buffered_responses,
1226 sync_target_request: None,
1227 in_progress_queue: Default::default(),
1228 buffered_responses: Default::default(),
1229 queued_validated_headers: Default::default(),
1230 metrics: Default::default(),
1231 }
1232 }
1233}
1234
1235#[inline]
1242fn calc_next_request(
1243 local_head: u64,
1244 next_request_block_number: u64,
1245 request_limit: u64,
1246) -> HeadersRequest {
1247 let diff = next_request_block_number - local_head;
1249 let limit = diff.min(request_limit);
1250 let start = next_request_block_number;
1251 HeadersRequest::falling(start.into(), limit)
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256 use super::*;
1257 use crate::headers::test_utils::child_header;
1258 use alloy_consensus::Header;
1259 use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1260 use assert_matches::assert_matches;
1261 use reth_consensus::test_utils::TestConsensus;
1262 use reth_network_p2p::test_utils::TestHeadersClient;
1263
1264 #[test]
1266 fn test_replace_number_semantics() {
1267 struct Fixture {
1268 sync_target_block: SyncTargetBlock,
1270 sync_target_option: Option<u64>,
1271
1272 replace_number: u64,
1274
1275 expected_result: Option<u64>,
1277
1278 new_number: u64,
1280 }
1281
1282 let fixtures = vec![
1283 Fixture {
1284 sync_target_block: SyncTargetBlock::Hash(B256::random()),
1285 sync_target_option: None,
1287 replace_number: 1,
1288 expected_result: None,
1289 new_number: 1,
1290 },
1291 Fixture {
1292 sync_target_block: SyncTargetBlock::Number(1),
1293 sync_target_option: Some(1),
1294 replace_number: 2,
1295 expected_result: Some(1),
1296 new_number: 2,
1297 },
1298 Fixture {
1299 sync_target_block: SyncTargetBlock::HashAndNumber {
1300 hash: B256::random(),
1301 number: 1,
1302 },
1303 sync_target_option: Some(1),
1304 replace_number: 2,
1305 expected_result: Some(1),
1306 new_number: 2,
1307 },
1308 ];
1309
1310 for fixture in fixtures {
1311 let mut sync_target_block = fixture.sync_target_block;
1312 let result = sync_target_block.replace_number(fixture.replace_number);
1313 assert_eq!(result, fixture.expected_result);
1314 assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1315
1316 let mut sync_target_option = fixture.sync_target_option;
1317 let option_result = sync_target_option.replace(fixture.replace_number);
1318 assert_eq!(option_result, fixture.expected_result);
1319 assert_eq!(sync_target_option, Some(fixture.new_number));
1320 }
1321 }
1322
1323 #[test]
1325 fn test_sync_target_update() {
1326 let client = Arc::new(TestHeadersClient::default());
1327
1328 let genesis = SealedHeader::default();
1329
1330 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1331 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1332 downloader.update_local_head(genesis);
1333 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1334
1335 downloader.sync_target_request.take();
1336
1337 let target = SyncTarget::Tip(B256::random());
1338 downloader.update_sync_target(target);
1339 assert!(downloader.sync_target_request.is_some());
1340
1341 downloader.sync_target_request.take();
1342 let target = SyncTarget::Gap(BlockWithParent {
1343 block: BlockNumHash::new(0, B256::random()),
1344 parent: Default::default(),
1345 });
1346 downloader.update_sync_target(target);
1347 assert!(downloader.sync_target_request.is_none());
1348 assert_matches!(
1349 downloader.sync_target,
1350 Some(target) => target.number().is_some()
1351 );
1352 }
1353
1354 #[test]
1356 fn test_head_update() {
1357 let client = Arc::new(TestHeadersClient::default());
1358
1359 let header: SealedHeader = SealedHeader::default();
1360
1361 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1362 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1363 downloader.update_local_head(header.clone());
1364 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1365
1366 downloader.queued_validated_headers.push(header.clone());
1367 let mut next = header.as_ref().clone();
1368 next.number += 1;
1369 downloader.update_local_head(SealedHeader::new(next, B256::random()));
1370 assert!(downloader.queued_validated_headers.is_empty());
1371 }
1372
1373 #[test]
1374 fn test_request_calc() {
1375 let local = 0;
1377 let next = 1000;
1378 let batch_size = 2;
1379 let request = calc_next_request(local, next, batch_size);
1380 assert_eq!(request.start, next.into());
1381 assert_eq!(request.limit, batch_size);
1382
1383 let local = 999;
1385 let next = 1000;
1386 let batch_size = 2;
1387 let request = calc_next_request(local, next, batch_size);
1388 assert_eq!(request.start, next.into());
1389 assert_eq!(request.limit, 1);
1390 }
1391
1392 #[test]
1394 fn test_next_request() {
1395 let client = Arc::new(TestHeadersClient::default());
1396
1397 let genesis = SealedHeader::default();
1398
1399 let batch_size = 99;
1400 let start = 1000;
1401 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1402 .request_limit(batch_size)
1403 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1404 downloader.update_local_head(genesis);
1405 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1406
1407 downloader.next_request_block_number = start;
1408
1409 let mut total = 0;
1410 while let Some(req) = downloader.next_request() {
1411 assert_eq!(req.start, (start - total).into());
1412 total += req.limit;
1413 }
1414 assert_eq!(total, start);
1415 assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1416 }
1417
1418 #[test]
1419 fn test_resp_order() {
1420 let mut heap = BinaryHeap::new();
1421 let hi = 1u64;
1422 heap.push(OrderedHeadersResponse::<Header> {
1423 headers: vec![],
1424 request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1425 peer_id: Default::default(),
1426 });
1427
1428 let lo = 0u64;
1429 heap.push(OrderedHeadersResponse {
1430 headers: vec![],
1431 request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1432 peer_id: Default::default(),
1433 });
1434
1435 assert_eq!(heap.pop().unwrap().block_number(), hi);
1436 assert_eq!(heap.pop().unwrap().block_number(), lo);
1437 }
1438
1439 #[tokio::test]
1440 async fn download_at_fork_head() {
1441 reth_tracing::init_test_tracing();
1442
1443 let client = Arc::new(TestHeadersClient::default());
1444
1445 let p3 = SealedHeader::default();
1446 let p2 = child_header(&p3);
1447 let p1 = child_header(&p2);
1448 let p0 = child_header(&p1);
1449
1450 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1451 .stream_batch_size(3)
1452 .request_limit(3)
1453 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1454 downloader.update_local_head(p3.clone());
1455 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1456
1457 client
1458 .extend(vec![
1459 p0.as_ref().clone(),
1460 p1.as_ref().clone(),
1461 p2.as_ref().clone(),
1462 p3.as_ref().clone(),
1463 ])
1464 .await;
1465
1466 let headers = downloader.next().await.unwrap();
1467 assert_eq!(headers, Ok(vec![p0, p1, p2,]));
1468 assert!(downloader.buffered_responses.is_empty());
1469 assert!(downloader.next().await.is_none());
1470 assert!(downloader.next().await.is_none());
1471 }
1472
1473 #[tokio::test]
1474 async fn download_one_by_one() {
1475 reth_tracing::init_test_tracing();
1476 let p3 = SealedHeader::default();
1477 let p2 = child_header(&p3);
1478 let p1 = child_header(&p2);
1479 let p0 = child_header(&p1);
1480
1481 let client = Arc::new(TestHeadersClient::default());
1482 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1483 .stream_batch_size(1)
1484 .request_limit(1)
1485 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1486 downloader.update_local_head(p3.clone());
1487 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1488
1489 client
1490 .extend(vec![
1491 p0.as_ref().clone(),
1492 p1.as_ref().clone(),
1493 p2.as_ref().clone(),
1494 p3.as_ref().clone(),
1495 ])
1496 .await;
1497
1498 let headers = downloader.next().await.unwrap();
1499 assert_eq!(headers, Ok(vec![p0]));
1500 let headers = headers.unwrap();
1501 assert_eq!(headers.capacity(), headers.len());
1502
1503 let headers = downloader.next().await.unwrap();
1504 assert_eq!(headers, Ok(vec![p1]));
1505 let headers = headers.unwrap();
1506 assert_eq!(headers.capacity(), headers.len());
1507
1508 let headers = downloader.next().await.unwrap();
1509 assert_eq!(headers, Ok(vec![p2]));
1510 let headers = headers.unwrap();
1511 assert_eq!(headers.capacity(), headers.len());
1512
1513 assert!(downloader.next().await.is_none());
1514 }
1515
1516 #[tokio::test]
1517 async fn download_one_by_one_larger_request_limit() {
1518 reth_tracing::init_test_tracing();
1519 let p3 = SealedHeader::default();
1520 let p2 = child_header(&p3);
1521 let p1 = child_header(&p2);
1522 let p0 = child_header(&p1);
1523
1524 let client = Arc::new(TestHeadersClient::default());
1525 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1526 .stream_batch_size(1)
1527 .request_limit(3)
1528 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1529 downloader.update_local_head(p3.clone());
1530 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1531
1532 client
1533 .extend(vec![
1534 p0.as_ref().clone(),
1535 p1.as_ref().clone(),
1536 p2.as_ref().clone(),
1537 p3.as_ref().clone(),
1538 ])
1539 .await;
1540
1541 let headers = downloader.next().await.unwrap();
1542 assert_eq!(headers, Ok(vec![p0]));
1543 let headers = headers.unwrap();
1544 assert_eq!(headers.capacity(), headers.len());
1545
1546 let headers = downloader.next().await.unwrap();
1547 assert_eq!(headers, Ok(vec![p1]));
1548 let headers = headers.unwrap();
1549 assert_eq!(headers.capacity(), headers.len());
1550
1551 let headers = downloader.next().await.unwrap();
1552 assert_eq!(headers, Ok(vec![p2]));
1553 let headers = headers.unwrap();
1554 assert_eq!(headers.capacity(), headers.len());
1555
1556 assert!(downloader.next().await.is_none());
1557 }
1558}