1use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, Sealable, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14 error::{DownloadError, DownloadResult, PeerRequestResult},
15 headers::{
16 client::{HeadersClient, HeadersRequest},
17 downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18 error::{HeadersDownloaderError, HeadersDownloaderResult},
19 },
20 priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives_traits::{GotExpected, SealedHeader};
24use reth_tasks::{TaskSpawner, TokioTaskExecutor};
25use std::{
26 cmp::{Ordering, Reverse},
27 collections::{binary_heap::PeekMut, BinaryHeap},
28 future::Future,
29 pin::Pin,
30 sync::Arc,
31 task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{debug, error, trace};
35
36const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H: Sealable> {
44 #[error(transparent)]
45 Downloader(#[from] HeadersDownloaderError<H>),
46 #[error(transparent)]
47 Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H: Sealable> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51 fn from(value: HeadersResponseError) -> Self {
52 Self::Response(Box::new(value))
53 }
54}
55
56#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70 consensus: Arc<dyn HeaderValidator<H::Header>>,
72 client: Arc<H>,
74 local_head: Option<SealedHeader<H::Header>>,
76 sync_target: Option<SyncTargetBlock>,
78 next_request_block_number: u64,
80 lowest_validated_header: Option<SealedHeader<H::Header>>,
82 next_chain_tip_block_number: u64,
84 request_limit: u64,
86 min_concurrent_requests: usize,
88 max_concurrent_requests: usize,
90 stream_batch_size: usize,
92 max_buffered_responses: usize,
94 sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99 in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101 buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103 queued_validated_headers: Vec<SealedHeader<H::Header>>,
107 metrics: HeaderDownloaderMetrics,
109}
110
111impl<H> ReverseHeadersDownloader<H>
114where
115 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117 pub fn builder() -> ReverseHeadersDownloaderBuilder {
119 ReverseHeadersDownloaderBuilder::default()
120 }
121
122 #[inline]
124 fn local_block_number(&self) -> Option<BlockNumber> {
125 self.local_head.as_ref().map(|h| h.number())
126 }
127
128 #[inline]
134 fn existing_local_block_number(&self) -> BlockNumber {
135 self.local_head.as_ref().expect("is initialized").number()
136 }
137
138 #[inline]
144 fn existing_sync_target(&self) -> SyncTargetBlock {
145 self.sync_target.as_ref().expect("is initialized").clone()
146 }
147
148 #[inline]
153 fn concurrent_request_limit(&self) -> usize {
154 let num_peers = self.client.num_connected_peers();
155
156 let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159 let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161 if num_peers < self.min_concurrent_requests {
163 return max_dynamic
164 }
165
166 max_dynamic.min(self.max_concurrent_requests)
167 }
168
169 fn next_request(&mut self) -> Option<HeadersRequest> {
175 if let Some(local_head) = self.local_block_number() {
176 if self.next_request_block_number > local_head {
177 let request = calc_next_request(
178 local_head,
179 self.next_request_block_number,
180 self.request_limit,
181 );
182 self.next_request_block_number -= request.limit;
185
186 return Some(request)
187 }
188 }
189
190 None
191 }
192
193 fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
203 self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
204 }
205
206 fn reset(&mut self) {
210 debug!(target: "downloaders::headers", "Resetting headers downloader");
211 self.next_request_block_number = 0;
212 self.next_chain_tip_block_number = 0;
213 self.sync_target.take();
214 }
215
216 fn validate_sync_target(
218 &self,
219 header: &SealedHeader<H::Header>,
220 request: HeadersRequest,
221 peer_id: PeerId,
222 ) -> Result<(), Box<HeadersResponseError>> {
223 match self.existing_sync_target() {
224 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
225 if header.hash() != hash =>
226 {
227 Err(Box::new(HeadersResponseError {
228 request,
229 peer_id: Some(peer_id),
230 error: DownloadError::InvalidTip(
231 GotExpected { got: header.hash(), expected: hash }.into(),
232 ),
233 }))
234 }
235 SyncTargetBlock::Number(number) if header.number() != number => {
236 Err(Box::new(HeadersResponseError {
237 request,
238 peer_id: Some(peer_id),
239 error: DownloadError::InvalidTipNumber(GotExpected {
240 got: header.number(),
241 expected: number,
242 }),
243 }))
244 }
245 _ => Ok(()),
246 }
247 }
248
249 fn process_next_headers(
257 &mut self,
258 request: HeadersRequest,
259 headers: Vec<H::Header>,
260 peer_id: PeerId,
261 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
262 let mut validated = Vec::with_capacity(headers.len());
263
264 let sealed_headers =
265 headers.into_par_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>();
266 for parent in sealed_headers {
267 if let Some(validated_header) =
269 validated.last().or_else(|| self.lowest_validated_header())
270 {
271 if let Err(error) = self.validate(validated_header, &parent) {
272 trace!(target: "downloaders::headers", %error ,"Failed to validate header");
273 return Err(
274 HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
275 )
276 }
277 } else {
278 self.validate_sync_target(&parent, request.clone(), peer_id)?;
279 }
280
281 validated.push(parent);
282 }
283
284 if let Some((last_header, head)) = validated
286 .last_mut()
287 .zip(self.local_head.as_ref())
288 .filter(|(last, head)| last.number() == head.number() + 1)
289 {
290 if let Err(error) = self.consensus.validate_header(&*last_header) {
292 trace!(target: "downloaders::headers", %error, "Failed to validate header");
293 return Err(HeadersResponseError {
294 request,
295 peer_id: Some(peer_id),
296 error: DownloadError::HeaderValidation {
297 hash: head.hash(),
298 number: head.number(),
299 error: Box::new(error),
300 },
301 }
302 .into())
303 }
304
305 if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
312 let local_head = head.clone();
313 error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
315
316 self.reset();
321
322 return Err(HeadersDownloaderError::DetachedHead {
323 local_head: Box::new(local_head),
324 header: Box::new(last_header.clone()),
325 error: Box::new(error),
326 }
327 .into())
328 }
329 }
330
331 self.next_chain_tip_block_number =
333 validated.last().expect("exists").number().saturating_sub(1);
334 self.queued_validated_headers.extend(validated);
335
336 Ok(())
337 }
338
339 fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
350 if let Some(old_target) =
352 self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
353 {
354 if target_block_number > old_target {
355 self.next_request_block_number = next_block;
358 self.next_chain_tip_block_number = next_block;
359 self.clear();
360 } else {
361 let skip = self
363 .queued_validated_headers
364 .iter()
365 .take_while(|last| last.number() > target_block_number)
366 .count();
367 self.queued_validated_headers.drain(..skip);
369 }
370 } else {
371 self.next_request_block_number = next_block;
373 self.next_chain_tip_block_number = next_block;
374 }
375 }
376
377 fn on_sync_target_outcome(
379 &mut self,
380 response: HeadersRequestOutcome<H::Header>,
381 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
382 let sync_target = self.existing_sync_target();
383 let HeadersRequestOutcome { request, outcome } = response;
384 match outcome {
385 Ok(res) => {
386 let (peer_id, mut headers) = res.split();
387
388 self.metrics.total_downloaded.increment(headers.len() as u64);
390
391 headers.sort_unstable_by_key(|h| Reverse(h.number()));
393
394 if headers.is_empty() {
395 return Err(HeadersResponseError {
396 request,
397 peer_id: Some(peer_id),
398 error: DownloadError::EmptyResponse,
399 }
400 .into())
401 }
402
403 let header = headers.swap_remove(0);
404 let target = SealedHeader::seal_slow(header);
405
406 match sync_target {
407 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
408 if target.hash() != hash {
409 return Err(HeadersResponseError {
410 request,
411 peer_id: Some(peer_id),
412 error: DownloadError::InvalidTip(
413 GotExpected { got: target.hash(), expected: hash }.into(),
414 ),
415 }
416 .into())
417 }
418 }
419 SyncTargetBlock::Number(number) => {
420 if target.number() != number {
421 return Err(HeadersResponseError {
422 request,
423 peer_id: Some(peer_id),
424 error: DownloadError::InvalidTipNumber(GotExpected {
425 got: target.number(),
426 expected: number,
427 }),
428 }
429 .into())
430 }
431 }
432 }
433
434 trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
435
436 let parent_block_number = target.number().saturating_sub(1);
438 self.on_block_number_update(target.number(), parent_block_number);
439
440 self.queued_validated_headers.push(target);
441
442 self.try_validate_buffered()
444 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
445 .transpose()?;
446
447 Ok(())
448 }
449 Err(err) => {
450 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
451 }
452 }
453 }
454
455 fn on_headers_outcome(
457 &mut self,
458 response: HeadersRequestOutcome<H::Header>,
459 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
460 let requested_block_number = response.block_number();
461 let HeadersRequestOutcome { request, outcome } = response;
462
463 match outcome {
464 Ok(res) => {
465 let (peer_id, mut headers) = res.split();
466
467 self.metrics.total_downloaded.increment(headers.len() as u64);
469
470 trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
471
472 if headers.is_empty() {
473 return Err(HeadersResponseError {
474 request,
475 peer_id: Some(peer_id),
476 error: DownloadError::EmptyResponse,
477 }
478 .into())
479 }
480
481 if (headers.len() as u64) != request.limit {
482 return Err(HeadersResponseError {
483 peer_id: Some(peer_id),
484 error: DownloadError::HeadersResponseTooShort(GotExpected {
485 got: headers.len() as u64,
486 expected: request.limit,
487 }),
488 request,
489 }
490 .into())
491 }
492
493 headers.sort_unstable_by_key(|h| Reverse(h.number()));
495
496 let highest = &headers[0];
498
499 trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
500
501 if highest.number() != requested_block_number {
502 return Err(HeadersResponseError {
503 request,
504 peer_id: Some(peer_id),
505 error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
506 got: highest.number(),
507 expected: requested_block_number,
508 }),
509 }
510 .into())
511 }
512
513 if highest.number() == self.next_chain_tip_block_number {
515 self.process_next_headers(request, headers, peer_id)?;
517 self.try_validate_buffered()
519 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
520 .transpose()?;
521 } else if highest.number() > self.existing_local_block_number() {
522 self.metrics.buffered_responses.increment(1.);
523 self.buffered_responses.push(OrderedHeadersResponse {
525 headers,
526 request,
527 peer_id,
528 })
529 }
530
531 Ok(())
532 }
533 Err(err) => {
536 trace!(target: "downloaders::headers", %err, "Response error");
537 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
538 }
539 }
540 }
541
542 fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
543 if let Some(peer_id) = peer_id {
545 trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
546 self.client.report_bad_message(peer_id);
547 }
548 }
549
550 fn on_headers_error(&self, err: Box<HeadersResponseError>) {
554 let HeadersResponseError { request, peer_id, error } = *err;
555
556 self.penalize_peer(peer_id, &error);
557
558 self.metrics.increment_errors(&error);
560
561 self.submit_request(request, Priority::High);
563 }
564
565 fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
569 loop {
570 let next_response = self.buffered_responses.peek_mut()?;
572 let next_block_number = next_response.block_number();
573 match next_block_number.cmp(&self.next_chain_tip_block_number) {
574 Ordering::Less => return None,
575 Ordering::Equal => {
576 let OrderedHeadersResponse { headers, request, peer_id } =
577 PeekMut::pop(next_response);
578 self.metrics.buffered_responses.decrement(1.);
579
580 if let Err(err) = self.process_next_headers(request, headers, peer_id) {
581 return Some(err)
582 }
583 }
584 Ordering::Greater => {
585 self.metrics.buffered_responses.decrement(1.);
586 PeekMut::pop(next_response);
587 }
588 }
589 }
590 }
591
592 const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
594 HeadersRequest::falling(start, 1)
595 }
596
597 fn submit_request(&self, request: HeadersRequest, priority: Priority) {
599 trace!(target: "downloaders::headers", ?request, "Submitting headers request");
600 self.in_progress_queue.push(self.request_fut(request, priority));
601 self.metrics.in_flight_requests.increment(1.);
602 }
603
604 fn request_fut(
605 &self,
606 request: HeadersRequest,
607 priority: Priority,
608 ) -> HeadersRequestFuture<H::Output> {
609 let client = Arc::clone(&self.client);
610 HeadersRequestFuture {
611 request: Some(request.clone()),
612 fut: client.get_headers_with_priority(request, priority),
613 }
614 }
615
616 fn validate(
618 &self,
619 header: &SealedHeader<H::Header>,
620 parent: &SealedHeader<H::Header>,
621 ) -> DownloadResult<()> {
622 validate_header_download(&self.consensus, header, parent)
623 }
624
625 fn clear(&mut self) {
627 self.lowest_validated_header.take();
628 self.queued_validated_headers = Vec::new();
629 self.buffered_responses = BinaryHeap::new();
630 self.in_progress_queue.clear();
631
632 self.metrics.in_flight_requests.set(0.);
633 self.metrics.buffered_responses.set(0.);
634 }
635
636 fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
638 let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
639 let mut rem = self.queued_validated_headers.split_off(batch_size);
640 std::mem::swap(&mut rem, &mut self.queued_validated_headers);
641 rem.shrink_to_fit();
657 rem
658 }
659}
660
661impl<H> ReverseHeadersDownloader<H>
662where
663 H: HeadersClient,
664 Self: HeaderDownloader + 'static,
665{
666 pub fn into_task(self) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
668 self.into_task_with(&TokioTaskExecutor::default())
669 }
670
671 pub fn into_task_with<S>(
673 self,
674 spawner: &S,
675 ) -> TaskDownloader<<Self as HeaderDownloader>::Header>
676 where
677 S: TaskSpawner,
678 {
679 TaskDownloader::spawn_with(self, spawner)
680 }
681}
682
683impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
684where
685 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
686{
687 type Header = H::Header;
688
689 fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
690 while self
692 .queued_validated_headers
693 .last()
694 .is_some_and(|last| last.number() <= head.number())
695 {
696 self.queued_validated_headers.pop();
698 }
699 trace!(
700 target: "downloaders::headers",
701 head=?head.num_hash(),
702 "Updating local head"
703 );
704 self.local_head = Some(head);
706 }
707
708 fn update_sync_target(&mut self, target: SyncTarget) {
710 let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
711 trace!(
712 target: "downloaders::headers",
713 sync_target=?target,
714 current_tip=?current_tip,
715 "Updating sync target"
716 );
717 match target {
718 SyncTarget::Tip(tip) => {
719 if Some(tip) != current_tip {
720 trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
721 let new_sync_target = SyncTargetBlock::from_hash(tip);
722
723 if let Some(target_number) = self
726 .queued_validated_headers
727 .first()
728 .filter(|h| h.hash() == tip)
729 .map(|h| h.number())
730 {
731 self.sync_target = Some(new_sync_target.with_number(target_number));
732 return
733 }
734
735 trace!(target: "downloaders::headers", new=?target, "Request new sync target");
736 self.metrics.out_of_order_requests.increment(1);
737 self.sync_target = Some(new_sync_target);
738 self.sync_target_request = Some(
739 self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
740 );
741 }
742 }
743 SyncTarget::Gap(existing) => {
744 let target = existing.parent;
745 if Some(target) != current_tip {
746 self.sync_target_request.take();
748 let parent_block_number = existing.block.number.saturating_sub(1);
751
752 trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
753
754 self.sync_target = match self.sync_target.take() {
756 Some(sync_target) => Some(sync_target.with_hash(target)),
757 None => Some(SyncTargetBlock::from_hash(target)),
758 };
759 self.on_block_number_update(parent_block_number, parent_block_number);
760 }
761 }
762 SyncTarget::TipNum(num) => {
763 let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
764 if Some(num) != current_tip_num {
765 trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
766 self.sync_target = Some(SyncTargetBlock::from_number(num));
768 self.sync_target_request = Some(
769 self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
770 );
771 }
772 }
773 }
774 }
775
776 fn set_batch_size(&mut self, batch_size: usize) {
777 self.stream_batch_size = batch_size;
778 }
779}
780
781impl<H> Stream for ReverseHeadersDownloader<H>
782where
783 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
784{
785 type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
786
787 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
788 let this = self.get_mut();
789
790 if this.local_head.is_none() || this.sync_target.is_none() {
793 trace!(
794 target: "downloaders::headers",
795 head=?this.local_block_number(),
796 sync_target=?this.sync_target,
797 "The downloader sync boundaries have not been set"
798 );
799 return Poll::Pending
800 }
801
802 while let Some(mut req) = this.sync_target_request.take() {
805 match req.poll_unpin(cx) {
806 Poll::Ready(outcome) => {
807 match this.on_sync_target_outcome(outcome) {
808 Ok(()) => break,
809 Err(ReverseHeadersDownloaderError::Response(error)) => {
810 trace!(target: "downloaders::headers", %error, "invalid sync target response");
811 if error.is_channel_closed() {
812 return Poll::Ready(None)
814 }
815
816 this.penalize_peer(error.peer_id, &error.error);
817 this.metrics.increment_errors(&error.error);
818 this.sync_target_request =
819 Some(this.request_fut(error.request, Priority::High));
820 }
821 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
822 this.clear();
823 return Poll::Ready(Some(Err(error)))
824 }
825 };
826 }
827 Poll::Pending => {
828 this.sync_target_request = Some(req);
829 return Poll::Pending
830 }
831 }
832 }
833
834 this.buffered_responses.shrink_to_fit();
836
837 loop {
846 while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
848 this.metrics.in_flight_requests.decrement(1.);
849 match this.on_headers_outcome(outcome) {
851 Ok(()) => (),
852 Err(ReverseHeadersDownloaderError::Response(error)) => {
853 if error.is_channel_closed() {
854 return Poll::Ready(None)
856 }
857 this.on_headers_error(error);
858 }
859 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
860 this.clear();
861 return Poll::Ready(Some(Err(error)))
862 }
863 };
864 }
865
866 this.buffered_responses.shrink_to_fit();
868
869 let mut progress = false;
871
872 let concurrent_request_limit = this.concurrent_request_limit();
873 while this.in_progress_queue.len() < concurrent_request_limit &&
875 this.buffered_responses.len() < this.max_buffered_responses
876 {
877 if let Some(request) = this.next_request() {
878 trace!(
879 target: "downloaders::headers",
880 "Requesting headers {request:?}"
881 );
882 progress = true;
883 this.submit_request(request, Priority::Normal);
884 } else {
885 break
887 }
888 }
889
890 if this.queued_validated_headers.len() >= this.stream_batch_size {
892 let next_batch = this.split_next_batch();
893
894 if this.queued_validated_headers.is_empty() {
897 this.lowest_validated_header = next_batch.last().cloned();
898 }
899
900 trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
901
902 this.metrics.total_flushed.increment(next_batch.len() as u64);
903 return Poll::Ready(Some(Ok(next_batch)))
904 }
905
906 if !progress {
907 break
908 }
909 }
910
911 if this.in_progress_queue.is_empty() {
913 let next_batch = this.split_next_batch();
914 if next_batch.is_empty() {
915 this.clear();
916 return Poll::Ready(None)
917 }
918 this.metrics.total_flushed.increment(next_batch.len() as u64);
919 return Poll::Ready(Some(Ok(next_batch)))
920 }
921
922 Poll::Pending
923 }
924}
925
926#[derive(Debug)]
928struct HeadersRequestFuture<F> {
929 request: Option<HeadersRequest>,
930 fut: F,
931}
932
933impl<F, H> Future for HeadersRequestFuture<F>
934where
935 F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
936{
937 type Output = HeadersRequestOutcome<H>;
938
939 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
940 let this = self.get_mut();
941 let outcome = ready!(this.fut.poll_unpin(cx));
942 let request = this.request.take().unwrap();
943
944 Poll::Ready(HeadersRequestOutcome { request, outcome })
945 }
946}
947
948struct HeadersRequestOutcome<H> {
950 request: HeadersRequest,
951 outcome: PeerRequestResult<Vec<H>>,
952}
953
954impl<H> HeadersRequestOutcome<H> {
957 const fn block_number(&self) -> u64 {
958 self.request.start.as_number().expect("is number")
959 }
960}
961
962#[derive(Debug)]
964struct OrderedHeadersResponse<H> {
965 headers: Vec<H>,
966 request: HeadersRequest,
967 peer_id: PeerId,
968}
969
970impl<H> OrderedHeadersResponse<H> {
973 const fn block_number(&self) -> u64 {
974 self.request.start.as_number().expect("is number")
975 }
976}
977
978impl<H> PartialEq for OrderedHeadersResponse<H> {
979 fn eq(&self, other: &Self) -> bool {
980 self.block_number() == other.block_number()
981 }
982}
983
984impl<H> Eq for OrderedHeadersResponse<H> {}
985
986impl<H> PartialOrd for OrderedHeadersResponse<H> {
987 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
988 Some(self.cmp(other))
989 }
990}
991
992impl<H> Ord for OrderedHeadersResponse<H> {
993 fn cmp(&self, other: &Self) -> Ordering {
994 self.block_number().cmp(&other.block_number())
995 }
996}
997
998#[derive(Debug, Error)]
1000#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
1001struct HeadersResponseError {
1002 request: HeadersRequest,
1003 peer_id: Option<PeerId>,
1004 #[source]
1005 error: DownloadError,
1006}
1007
1008impl HeadersResponseError {
1009 const fn is_channel_closed(&self) -> bool {
1011 if let DownloadError::RequestError(ref err) = self.error {
1012 return err.is_channel_closed()
1013 }
1014 false
1015 }
1016}
1017
1018#[derive(Clone, Debug)]
1021pub enum SyncTargetBlock {
1022 Hash(B256),
1024 Number(u64),
1026 HashAndNumber {
1028 hash: B256,
1030 number: u64,
1032 },
1033}
1034
1035impl SyncTargetBlock {
1036 const fn from_hash(hash: B256) -> Self {
1038 Self::Hash(hash)
1039 }
1040
1041 const fn from_number(num: u64) -> Self {
1043 Self::Number(num)
1044 }
1045
1046 const fn with_hash(self, hash: B256) -> Self {
1048 match self {
1049 Self::Hash(_) => Self::Hash(hash),
1050 Self::Number(number) | Self::HashAndNumber { number, .. } => {
1051 Self::HashAndNumber { hash, number }
1052 }
1053 }
1054 }
1055
1056 const fn with_number(self, number: u64) -> Self {
1058 match self {
1059 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1060 Self::HashAndNumber { hash, number }
1061 }
1062 Self::Number(_) => Self::Number(number),
1063 }
1064 }
1065
1066 const fn replace_number(&mut self, number: u64) -> Option<u64> {
1071 match self {
1072 Self::Hash(hash) => {
1073 *self = Self::HashAndNumber { hash: *hash, number };
1074 None
1075 }
1076 Self::Number(old_number) => {
1077 let res = Some(*old_number);
1078 *self = Self::Number(number);
1079 res
1080 }
1081 Self::HashAndNumber { number: old_number, hash } => {
1082 let res = Some(*old_number);
1083 *self = Self::HashAndNumber { hash: *hash, number };
1084 res
1085 }
1086 }
1087 }
1088
1089 const fn hash(&self) -> Option<B256> {
1091 match self {
1092 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1093 Self::Number(_) => None,
1094 }
1095 }
1096
1097 const fn number(&self) -> Option<u64> {
1099 match self {
1100 Self::Hash(_) => None,
1101 Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1102 }
1103 }
1104}
1105
1106#[derive(Debug)]
1109pub struct ReverseHeadersDownloaderBuilder {
1110 request_limit: u64,
1112 stream_batch_size: usize,
1114 min_concurrent_requests: usize,
1116 max_concurrent_requests: usize,
1118 max_buffered_responses: usize,
1120}
1121
1122impl ReverseHeadersDownloaderBuilder {
1123 pub fn new(config: HeadersConfig) -> Self {
1126 Self::default()
1127 .request_limit(config.downloader_request_limit)
1128 .min_concurrent_requests(config.downloader_min_concurrent_requests)
1129 .max_concurrent_requests(config.downloader_max_concurrent_requests)
1130 .max_buffered_responses(config.downloader_max_buffered_responses)
1131 .stream_batch_size(config.commit_threshold as usize)
1132 }
1133}
1134
1135impl Default for ReverseHeadersDownloaderBuilder {
1136 fn default() -> Self {
1137 Self {
1138 stream_batch_size: 10_000,
1139 request_limit: 1_000,
1142 max_concurrent_requests: 100,
1143 min_concurrent_requests: 5,
1144 max_buffered_responses: 100,
1145 }
1146 }
1147}
1148
1149impl ReverseHeadersDownloaderBuilder {
1150 pub const fn request_limit(mut self, limit: u64) -> Self {
1155 self.request_limit = limit;
1156 self
1157 }
1158
1159 pub const fn stream_batch_size(mut self, size: usize) -> Self {
1165 self.stream_batch_size = size;
1166 self
1167 }
1168
1169 pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1174 self.min_concurrent_requests = min_concurrent_requests;
1175 self
1176 }
1177
1178 pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1182 self.max_concurrent_requests = max_concurrent_requests;
1183 self
1184 }
1185
1186 pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1193 self.max_buffered_responses = max_buffered_responses;
1194 self
1195 }
1196
1197 pub fn build<H>(
1200 self,
1201 client: H,
1202 consensus: Arc<dyn HeaderValidator<H::Header>>,
1203 ) -> ReverseHeadersDownloader<H>
1204 where
1205 H: HeadersClient + 'static,
1206 {
1207 let Self {
1208 request_limit,
1209 stream_batch_size,
1210 min_concurrent_requests,
1211 max_concurrent_requests,
1212 max_buffered_responses,
1213 } = self;
1214 ReverseHeadersDownloader {
1215 consensus,
1216 client: Arc::new(client),
1217 local_head: None,
1218 sync_target: None,
1219 next_request_block_number: 0,
1222 next_chain_tip_block_number: 0,
1223 lowest_validated_header: None,
1224 request_limit,
1225 min_concurrent_requests,
1226 max_concurrent_requests,
1227 stream_batch_size,
1228 max_buffered_responses,
1229 sync_target_request: None,
1230 in_progress_queue: Default::default(),
1231 buffered_responses: Default::default(),
1232 queued_validated_headers: Default::default(),
1233 metrics: Default::default(),
1234 }
1235 }
1236}
1237
1238#[inline]
1245fn calc_next_request(
1246 local_head: u64,
1247 next_request_block_number: u64,
1248 request_limit: u64,
1249) -> HeadersRequest {
1250 let diff = next_request_block_number - local_head;
1252 let limit = diff.min(request_limit);
1253 let start = next_request_block_number;
1254 HeadersRequest::falling(start.into(), limit)
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259 use super::*;
1260 use crate::headers::test_utils::child_header;
1261 use alloy_consensus::Header;
1262 use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1263 use assert_matches::assert_matches;
1264 use reth_consensus::test_utils::TestConsensus;
1265 use reth_network_p2p::test_utils::TestHeadersClient;
1266
1267 #[test]
1269 fn test_replace_number_semantics() {
1270 struct Fixture {
1271 sync_target_block: SyncTargetBlock,
1273 sync_target_option: Option<u64>,
1274
1275 replace_number: u64,
1277
1278 expected_result: Option<u64>,
1280
1281 new_number: u64,
1283 }
1284
1285 let fixtures = vec![
1286 Fixture {
1287 sync_target_block: SyncTargetBlock::Hash(B256::random()),
1288 sync_target_option: None,
1290 replace_number: 1,
1291 expected_result: None,
1292 new_number: 1,
1293 },
1294 Fixture {
1295 sync_target_block: SyncTargetBlock::Number(1),
1296 sync_target_option: Some(1),
1297 replace_number: 2,
1298 expected_result: Some(1),
1299 new_number: 2,
1300 },
1301 Fixture {
1302 sync_target_block: SyncTargetBlock::HashAndNumber {
1303 hash: B256::random(),
1304 number: 1,
1305 },
1306 sync_target_option: Some(1),
1307 replace_number: 2,
1308 expected_result: Some(1),
1309 new_number: 2,
1310 },
1311 ];
1312
1313 for fixture in fixtures {
1314 let mut sync_target_block = fixture.sync_target_block;
1315 let result = sync_target_block.replace_number(fixture.replace_number);
1316 assert_eq!(result, fixture.expected_result);
1317 assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1318
1319 let mut sync_target_option = fixture.sync_target_option;
1320 let option_result = sync_target_option.replace(fixture.replace_number);
1321 assert_eq!(option_result, fixture.expected_result);
1322 assert_eq!(sync_target_option, Some(fixture.new_number));
1323 }
1324 }
1325
1326 #[test]
1328 fn test_sync_target_update() {
1329 let client = Arc::new(TestHeadersClient::default());
1330
1331 let genesis = SealedHeader::default();
1332
1333 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1334 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1335 downloader.update_local_head(genesis);
1336 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1337
1338 downloader.sync_target_request.take();
1339
1340 let target = SyncTarget::Tip(B256::random());
1341 downloader.update_sync_target(target);
1342 assert!(downloader.sync_target_request.is_some());
1343
1344 downloader.sync_target_request.take();
1345 let target = SyncTarget::Gap(BlockWithParent {
1346 block: BlockNumHash::new(0, B256::random()),
1347 parent: Default::default(),
1348 });
1349 downloader.update_sync_target(target);
1350 assert!(downloader.sync_target_request.is_none());
1351 assert_matches!(
1352 downloader.sync_target,
1353 Some(target) => target.number().is_some()
1354 );
1355 }
1356
1357 #[test]
1359 fn test_head_update() {
1360 let client = Arc::new(TestHeadersClient::default());
1361
1362 let header: SealedHeader = SealedHeader::default();
1363
1364 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1365 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1366 downloader.update_local_head(header.clone());
1367 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1368
1369 downloader.queued_validated_headers.push(header.clone());
1370 let mut next = header.as_ref().clone();
1371 next.number += 1;
1372 downloader.update_local_head(SealedHeader::new(next, B256::random()));
1373 assert!(downloader.queued_validated_headers.is_empty());
1374 }
1375
1376 #[test]
1377 fn test_request_calc() {
1378 let local = 0;
1380 let next = 1000;
1381 let batch_size = 2;
1382 let request = calc_next_request(local, next, batch_size);
1383 assert_eq!(request.start, next.into());
1384 assert_eq!(request.limit, batch_size);
1385
1386 let local = 999;
1388 let next = 1000;
1389 let batch_size = 2;
1390 let request = calc_next_request(local, next, batch_size);
1391 assert_eq!(request.start, next.into());
1392 assert_eq!(request.limit, 1);
1393 }
1394
1395 #[test]
1397 fn test_next_request() {
1398 let client = Arc::new(TestHeadersClient::default());
1399
1400 let genesis = SealedHeader::default();
1401
1402 let batch_size = 99;
1403 let start = 1000;
1404 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1405 .request_limit(batch_size)
1406 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1407 downloader.update_local_head(genesis);
1408 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1409
1410 downloader.next_request_block_number = start;
1411
1412 let mut total = 0;
1413 while let Some(req) = downloader.next_request() {
1414 assert_eq!(req.start, (start - total).into());
1415 total += req.limit;
1416 }
1417 assert_eq!(total, start);
1418 assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1419 }
1420
1421 #[test]
1422 fn test_resp_order() {
1423 let mut heap = BinaryHeap::new();
1424 let hi = 1u64;
1425 heap.push(OrderedHeadersResponse::<Header> {
1426 headers: vec![],
1427 request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1428 peer_id: Default::default(),
1429 });
1430
1431 let lo = 0u64;
1432 heap.push(OrderedHeadersResponse {
1433 headers: vec![],
1434 request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1435 peer_id: Default::default(),
1436 });
1437
1438 assert_eq!(heap.pop().unwrap().block_number(), hi);
1439 assert_eq!(heap.pop().unwrap().block_number(), lo);
1440 }
1441
1442 #[tokio::test]
1443 async fn download_at_fork_head() {
1444 reth_tracing::init_test_tracing();
1445
1446 let client = Arc::new(TestHeadersClient::default());
1447
1448 let p3 = SealedHeader::default();
1449 let p2 = child_header(&p3);
1450 let p1 = child_header(&p2);
1451 let p0 = child_header(&p1);
1452
1453 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1454 .stream_batch_size(3)
1455 .request_limit(3)
1456 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1457 downloader.update_local_head(p3.clone());
1458 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1459
1460 client
1461 .extend(vec![
1462 p0.as_ref().clone(),
1463 p1.as_ref().clone(),
1464 p2.as_ref().clone(),
1465 p3.as_ref().clone(),
1466 ])
1467 .await;
1468
1469 let headers = downloader.next().await.unwrap();
1470 assert_eq!(headers, Ok(vec![p0, p1, p2,]));
1471 assert!(downloader.buffered_responses.is_empty());
1472 assert!(downloader.next().await.is_none());
1473 assert!(downloader.next().await.is_none());
1474 }
1475
1476 #[tokio::test]
1477 async fn download_one_by_one() {
1478 reth_tracing::init_test_tracing();
1479 let p3 = SealedHeader::default();
1480 let p2 = child_header(&p3);
1481 let p1 = child_header(&p2);
1482 let p0 = child_header(&p1);
1483
1484 let client = Arc::new(TestHeadersClient::default());
1485 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1486 .stream_batch_size(1)
1487 .request_limit(1)
1488 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1489 downloader.update_local_head(p3.clone());
1490 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1491
1492 client
1493 .extend(vec![
1494 p0.as_ref().clone(),
1495 p1.as_ref().clone(),
1496 p2.as_ref().clone(),
1497 p3.as_ref().clone(),
1498 ])
1499 .await;
1500
1501 let headers = downloader.next().await.unwrap();
1502 assert_eq!(headers, Ok(vec![p0]));
1503 let headers = headers.unwrap();
1504 assert_eq!(headers.capacity(), headers.len());
1505
1506 let headers = downloader.next().await.unwrap();
1507 assert_eq!(headers, Ok(vec![p1]));
1508 let headers = headers.unwrap();
1509 assert_eq!(headers.capacity(), headers.len());
1510
1511 let headers = downloader.next().await.unwrap();
1512 assert_eq!(headers, Ok(vec![p2]));
1513 let headers = headers.unwrap();
1514 assert_eq!(headers.capacity(), headers.len());
1515
1516 assert!(downloader.next().await.is_none());
1517 }
1518
1519 #[tokio::test]
1520 async fn download_one_by_one_larger_request_limit() {
1521 reth_tracing::init_test_tracing();
1522 let p3 = SealedHeader::default();
1523 let p2 = child_header(&p3);
1524 let p1 = child_header(&p2);
1525 let p0 = child_header(&p1);
1526
1527 let client = Arc::new(TestHeadersClient::default());
1528 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1529 .stream_batch_size(1)
1530 .request_limit(3)
1531 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1532 downloader.update_local_head(p3.clone());
1533 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1534
1535 client
1536 .extend(vec![
1537 p0.as_ref().clone(),
1538 p1.as_ref().clone(),
1539 p2.as_ref().clone(),
1540 p3.as_ref().clone(),
1541 ])
1542 .await;
1543
1544 let headers = downloader.next().await.unwrap();
1545 assert_eq!(headers, Ok(vec![p0]));
1546 let headers = headers.unwrap();
1547 assert_eq!(headers.capacity(), headers.len());
1548
1549 let headers = downloader.next().await.unwrap();
1550 assert_eq!(headers, Ok(vec![p1]));
1551 let headers = headers.unwrap();
1552 assert_eq!(headers.capacity(), headers.len());
1553
1554 let headers = downloader.next().await.unwrap();
1555 assert_eq!(headers, Ok(vec![p2]));
1556 let headers = headers.unwrap();
1557 assert_eq!(headers.capacity(), headers.len());
1558
1559 assert!(downloader.next().await.is_none());
1560 }
1561}