Skip to main content

reth_downloaders/headers/
reverse_headers.rs

1//! A headers downloader that can handle multiple requests concurrently.
2
3use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, Sealable, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14    error::{DownloadError, DownloadResult, PeerRequestResult},
15    headers::{
16        client::{HeadersClient, HeadersRequest},
17        downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18        error::{HeadersDownloaderError, HeadersDownloaderResult},
19    },
20    priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives_traits::{GotExpected, SealedHeader};
24use reth_tasks::Runtime;
25use std::{
26    cmp::{Ordering, Reverse},
27    collections::{binary_heap::PeekMut, BinaryHeap},
28    future::Future,
29    pin::Pin,
30    sync::Arc,
31    task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{debug, error, trace};
35
36/// A heuristic that is used to determine the number of requests that should be prepared for a peer.
37/// This should ensure that there are always requests lined up for peers to handle while the
38/// downloader is yielding a next batch of headers that is being committed to the database.
39const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41/// Wrapper for internal downloader errors.
42#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H: Sealable> {
44    #[error(transparent)]
45    Downloader(#[from] HeadersDownloaderError<H>),
46    #[error(transparent)]
47    Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H: Sealable> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51    fn from(value: HeadersResponseError) -> Self {
52        Self::Response(Box::new(value))
53    }
54}
55
56/// Downloads headers concurrently.
57///
58/// This [`HeaderDownloader`] downloads headers using the configured [`HeadersClient`].
59/// Headers can be requested by hash or block number and take a `limit` parameter. This downloader
60/// tries to fill the gap between the local head of the node and the chain tip by issuing multiple
61/// requests at a time but yielding them in batches on [`Stream::poll_next`].
62///
63/// **Note:** This downloader downloads in reverse, see also
64/// [`reth_network_p2p::headers::client::HeadersDirection`], this means the batches of headers that
65/// this downloader yields will start at the chain tip and move towards the local head: falling
66/// block numbers.
67#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70    /// Consensus client used to validate headers
71    consensus: Arc<dyn HeaderValidator<H::Header>>,
72    /// Client used to download headers.
73    client: Arc<H>,
74    /// The local head of the chain.
75    local_head: Option<SealedHeader<H::Header>>,
76    /// Block we want to close the gap to.
77    sync_target: Option<SyncTargetBlock>,
78    /// The block number to use for requests.
79    next_request_block_number: u64,
80    /// Keeps track of the block we need to validate next.
81    lowest_validated_header: Option<SealedHeader<H::Header>>,
82    /// Tip block number to start validating from (in reverse)
83    next_chain_tip_block_number: u64,
84    /// The batch size per one request
85    request_limit: u64,
86    /// Minimum amount of requests to handle concurrently.
87    min_concurrent_requests: usize,
88    /// Maximum amount of requests to handle concurrently.
89    max_concurrent_requests: usize,
90    /// The number of block headers to return at once
91    stream_batch_size: usize,
92    /// Maximum amount of received headers to buffer internally.
93    max_buffered_responses: usize,
94    /// Contains the request to retrieve the headers for the sync target
95    ///
96    /// This will give us the block number of the `sync_target`, after which we can send multiple
97    /// requests at a time.
98    sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99    /// requests in progress
100    in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101    /// Buffered, unvalidated responses
102    buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103    /// Buffered, _sorted_ and validated headers ready to be returned.
104    ///
105    /// Note: headers are sorted from high to low
106    queued_validated_headers: Vec<SealedHeader<H::Header>>,
107    /// Header downloader metrics.
108    metrics: HeaderDownloaderMetrics,
109}
110
111// === impl ReverseHeadersDownloader ===
112
113impl<H> ReverseHeadersDownloader<H>
114where
115    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117    /// Convenience method to create a [`ReverseHeadersDownloaderBuilder`] without importing it
118    pub fn builder() -> ReverseHeadersDownloaderBuilder {
119        ReverseHeadersDownloaderBuilder::default()
120    }
121
122    /// Returns the block number the local node is at.
123    #[inline]
124    fn local_block_number(&self) -> Option<BlockNumber> {
125        self.local_head.as_ref().map(|h| h.number())
126    }
127
128    /// Returns the existing local head block number
129    ///
130    /// # Panics
131    ///
132    /// If the local head has not been set.
133    #[inline]
134    fn existing_local_block_number(&self) -> BlockNumber {
135        self.local_head.as_ref().expect("is initialized").number()
136    }
137
138    /// Returns the existing sync target.
139    ///
140    /// # Panics
141    ///
142    /// If the sync target has never been set.
143    #[inline]
144    fn existing_sync_target(&self) -> SyncTargetBlock {
145        self.sync_target.as_ref().expect("is initialized").clone()
146    }
147
148    /// Max requests to handle at the same time
149    ///
150    /// This depends on the number of active peers but will always be
151    /// `min_concurrent_requests..max_concurrent_requests`
152    #[inline]
153    fn concurrent_request_limit(&self) -> usize {
154        let num_peers = self.client.num_connected_peers();
155
156        // we try to keep more requests than available peers active so that there's always a
157        // followup request available for a peer
158        let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159        let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161        // If only a few peers are connected we keep it low
162        if num_peers < self.min_concurrent_requests {
163            return max_dynamic
164        }
165
166        max_dynamic.min(self.max_concurrent_requests)
167    }
168
169    /// Returns the next header request
170    ///
171    /// This will advance the current block towards the local head.
172    ///
173    /// Returns `None` if no more requests are required.
174    fn next_request(&mut self) -> Option<HeadersRequest> {
175        if let Some(local_head) = self.local_block_number() &&
176            self.next_request_block_number > local_head
177        {
178            let request =
179                calc_next_request(local_head, self.next_request_block_number, self.request_limit);
180            // need to shift the tracked request block number based on the number of requested
181            // headers so follow-up requests will use that as start.
182            self.next_request_block_number -= request.limit;
183
184            return Some(request)
185        }
186
187        None
188    }
189
190    /// Returns the next header to use for validation.
191    ///
192    /// Since this downloader downloads blocks with falling block number, this will return the
193    /// lowest (in terms of block number) validated header.
194    ///
195    /// This is either the last `queued_validated_headers`, or if has been drained entirely the
196    /// `lowest_validated_header`.
197    ///
198    /// This only returns `None` if we haven't fetched the initial chain tip yet.
199    fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
200        self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
201    }
202
203    /// Resets the request trackers and clears the sync target.
204    ///
205    /// This ensures the downloader will restart after a new sync target has been set.
206    fn reset(&mut self) {
207        debug!(target: "downloaders::headers", "Resetting headers downloader");
208        self.next_request_block_number = 0;
209        self.next_chain_tip_block_number = 0;
210        self.sync_target.take();
211    }
212
213    /// Validate that the received header matches the expected sync target.
214    fn validate_sync_target(
215        &self,
216        header: &SealedHeader<H::Header>,
217        request: HeadersRequest,
218        peer_id: PeerId,
219    ) -> Result<(), Box<HeadersResponseError>> {
220        match self.existing_sync_target() {
221            SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
222                if header.hash() != hash =>
223            {
224                Err(Box::new(HeadersResponseError {
225                    request,
226                    peer_id: Some(peer_id),
227                    error: DownloadError::InvalidTip(
228                        GotExpected { got: header.hash(), expected: hash }.into(),
229                    ),
230                }))
231            }
232            SyncTargetBlock::Number(number) if header.number() != number => {
233                Err(Box::new(HeadersResponseError {
234                    request,
235                    peer_id: Some(peer_id),
236                    error: DownloadError::InvalidTipNumber(GotExpected {
237                        got: header.number(),
238                        expected: number,
239                    }),
240                }))
241            }
242            _ => Ok(()),
243        }
244    }
245
246    /// Processes the next headers in line.
247    ///
248    /// This will validate all headers and insert them into the validated buffer.
249    ///
250    /// Returns an error if the given headers are invalid.
251    ///
252    /// Caution: this expects the `headers` to be sorted with _falling_ block numbers
253    fn process_next_headers(
254        &mut self,
255        request: HeadersRequest,
256        headers: Vec<H::Header>,
257        peer_id: PeerId,
258    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
259        let mut validated = Vec::with_capacity(headers.len());
260
261        let sealed_headers =
262            headers.into_par_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>();
263        for parent in sealed_headers {
264            // Validate that the header is the parent header of the last validated header.
265            if let Some(validated_header) =
266                validated.last().or_else(|| self.lowest_validated_header())
267            {
268                if let Err(error) = self.validate(validated_header, &parent) {
269                    trace!(target: "downloaders::headers", %error ,"Failed to validate header");
270                    return Err(
271                        HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
272                    )
273                }
274            } else {
275                self.validate_sync_target(&parent, request.clone(), peer_id)?;
276            }
277
278            validated.push(parent);
279        }
280
281        // If the last (smallest) validated header attaches to the local head, validate it.
282        if let Some((last_header, head)) = validated
283            .last_mut()
284            .zip(self.local_head.as_ref())
285            .filter(|(last, head)| last.number() == head.number() + 1)
286        {
287            // Every header must be valid on its own
288            if let Err(error) = self.consensus.validate_header(&*last_header) {
289                trace!(target: "downloaders::headers", %error, "Failed to validate header");
290                return Err(HeadersResponseError {
291                    request,
292                    peer_id: Some(peer_id),
293                    error: DownloadError::HeaderValidation {
294                        hash: head.hash(),
295                        number: head.number(),
296                        error: Box::new(error),
297                    },
298                }
299                .into())
300            }
301
302            // If the header is valid on its own, but not against its parent, we return it as
303            // detached head error.
304            // In stage sync this will trigger an unwind because this means that the local head
305            // is not part of the chain the sync target is on. In other words, the downloader was
306            // unable to connect the sync target with the local head because the sync target and
307            // the local head or on different chains.
308            if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
309                let local_head = head.clone();
310                // Replace the last header with a detached variant
311                error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
312
313                // Reset trackers so that we can start over the next time the sync target is
314                // updated.
315                // The expected event flow when that happens is that the node will unwind the local
316                // chain and restart the downloader.
317                self.reset();
318
319                return Err(HeadersDownloaderError::DetachedHead {
320                    local_head: Box::new(local_head),
321                    header: Box::new(last_header.clone()),
322                    error: Box::new(error),
323                }
324                .into())
325            }
326        }
327
328        // update tracked block info (falling block number)
329        self.next_chain_tip_block_number =
330            validated.last().expect("exists").number().saturating_sub(1);
331        self.queued_validated_headers.extend(validated);
332
333        Ok(())
334    }
335
336    /// Updates the state based on the given `target_block_number`
337    ///
338    /// There are three different outcomes:
339    ///  * This is the first time this is called: current `sync_target` block is still `None`. In
340    ///    which case we're initializing the request trackers to `next_block`
341    ///  * The `target_block_number` is _higher_ than the current target. In which case we start
342    ///    over with a new range
343    ///  * The `target_block_number` is _lower_ than the current target or the _same_. In which case
344    ///    we don't need to update the request trackers but need to ensure already buffered headers
345    ///    are _not_ higher than the new `target_block_number`.
346    fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
347        // Update the trackers
348        if let Some(old_target) =
349            self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
350        {
351            if target_block_number > old_target {
352                // the new target is higher than the old target we need to update the
353                // request tracker and reset everything
354                self.next_request_block_number = next_block;
355                self.next_chain_tip_block_number = next_block;
356                self.clear();
357            } else {
358                // ensure already validated headers are in range
359                let skip = self
360                    .queued_validated_headers
361                    .iter()
362                    .take_while(|last| last.number() > target_block_number)
363                    .count();
364                // removes all headers that are higher than current target
365                self.queued_validated_headers.drain(..skip);
366            }
367        } else {
368            // this occurs on the initial sync target request
369            self.next_request_block_number = next_block;
370            self.next_chain_tip_block_number = next_block;
371        }
372    }
373
374    /// Handles the response for the request for the sync target
375    fn on_sync_target_outcome(
376        &mut self,
377        response: HeadersRequestOutcome<H::Header>,
378    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
379        let sync_target = self.existing_sync_target();
380        let HeadersRequestOutcome { request, outcome } = response;
381        match outcome {
382            Ok(res) => {
383                let (peer_id, mut headers) = res.split();
384
385                // update total downloaded metric
386                self.metrics.total_downloaded.increment(headers.len() as u64);
387
388                // sort headers from highest to lowest block number
389                headers.sort_unstable_by_key(|h| Reverse(h.number()));
390
391                if headers.is_empty() {
392                    return Err(HeadersResponseError {
393                        request,
394                        peer_id: Some(peer_id),
395                        error: DownloadError::EmptyResponse,
396                    }
397                    .into())
398                }
399
400                let header = headers.swap_remove(0);
401                let target = SealedHeader::seal_slow(header);
402
403                match sync_target {
404                    SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
405                        if target.hash() != hash {
406                            return Err(HeadersResponseError {
407                                request,
408                                peer_id: Some(peer_id),
409                                error: DownloadError::InvalidTip(
410                                    GotExpected { got: target.hash(), expected: hash }.into(),
411                                ),
412                            }
413                            .into())
414                        }
415                    }
416                    SyncTargetBlock::Number(number) => {
417                        if target.number() != number {
418                            return Err(HeadersResponseError {
419                                request,
420                                peer_id: Some(peer_id),
421                                error: DownloadError::InvalidTipNumber(GotExpected {
422                                    got: target.number(),
423                                    expected: number,
424                                }),
425                            }
426                            .into())
427                        }
428                    }
429                }
430
431                trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
432
433                // This is the next block we need to start issuing requests from
434                let parent_block_number = target.number().saturating_sub(1);
435                self.on_block_number_update(target.number(), parent_block_number);
436
437                self.queued_validated_headers.push(target);
438
439                // try to validate all buffered responses blocked by this successful response
440                self.try_validate_buffered()
441                    .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
442                    .transpose()?;
443
444                Ok(())
445            }
446            Err(err) => {
447                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
448            }
449        }
450    }
451
452    /// Invoked when we received a response
453    fn on_headers_outcome(
454        &mut self,
455        response: HeadersRequestOutcome<H::Header>,
456    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
457        let requested_block_number = response.block_number();
458        let HeadersRequestOutcome { request, outcome } = response;
459
460        match outcome {
461            Ok(res) => {
462                let (peer_id, mut headers) = res.split();
463
464                // update total downloaded metric
465                self.metrics.total_downloaded.increment(headers.len() as u64);
466
467                trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
468
469                if headers.is_empty() {
470                    return Err(HeadersResponseError {
471                        request,
472                        peer_id: Some(peer_id),
473                        error: DownloadError::EmptyResponse,
474                    }
475                    .into())
476                }
477
478                if (headers.len() as u64) != request.limit {
479                    return Err(HeadersResponseError {
480                        peer_id: Some(peer_id),
481                        error: DownloadError::HeadersResponseTooShort(GotExpected {
482                            got: headers.len() as u64,
483                            expected: request.limit,
484                        }),
485                        request,
486                    }
487                    .into())
488                }
489
490                // sort headers from highest to lowest block number
491                headers.sort_unstable_by_key(|h| Reverse(h.number()));
492
493                // validate the response
494                let highest = &headers[0];
495
496                trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
497
498                if highest.number() != requested_block_number {
499                    return Err(HeadersResponseError {
500                        request,
501                        peer_id: Some(peer_id),
502                        error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
503                            got: highest.number(),
504                            expected: requested_block_number,
505                        }),
506                    }
507                    .into())
508                }
509
510                // check if the response is the next expected
511                if highest.number() == self.next_chain_tip_block_number {
512                    // is next response, validate it
513                    self.process_next_headers(request, headers, peer_id)?;
514                    // try to validate all buffered responses blocked by this successful response
515                    self.try_validate_buffered()
516                        .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
517                        .transpose()?;
518                } else if highest.number() > self.existing_local_block_number() {
519                    self.metrics.buffered_responses.increment(1.);
520                    // can't validate yet
521                    self.buffered_responses.push(OrderedHeadersResponse {
522                        headers,
523                        request,
524                        peer_id,
525                    })
526                }
527
528                Ok(())
529            }
530            // most likely a noop, because this error
531            // would've been handled by the fetcher internally
532            Err(err) => {
533                trace!(target: "downloaders::headers", %err, "Response error");
534                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
535            }
536        }
537    }
538
539    fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
540        // Penalize the peer for bad response
541        if let Some(peer_id) = peer_id {
542            trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
543            self.client.report_bad_message(peer_id);
544        }
545    }
546
547    /// Handles the error of a bad response
548    ///
549    /// This will re-submit the request.
550    fn on_headers_error(&self, err: Box<HeadersResponseError>) {
551        let HeadersResponseError { request, peer_id, error } = *err;
552
553        self.penalize_peer(peer_id, &error);
554
555        // Update error metric
556        self.metrics.increment_errors(&error);
557
558        // Re-submit the request
559        self.submit_request(request, Priority::High);
560    }
561
562    /// Attempts to validate the buffered responses
563    ///
564    /// Returns an error if the next expected response was popped, but failed validation.
565    fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
566        loop {
567            // Check to see if we've already received the next value
568            let next_response = self.buffered_responses.peek_mut()?;
569            let next_block_number = next_response.block_number();
570            match next_block_number.cmp(&self.next_chain_tip_block_number) {
571                Ordering::Less => return None,
572                Ordering::Equal => {
573                    let OrderedHeadersResponse { headers, request, peer_id } =
574                        PeekMut::pop(next_response);
575                    self.metrics.buffered_responses.decrement(1.);
576
577                    if let Err(err) = self.process_next_headers(request, headers, peer_id) {
578                        return Some(err)
579                    }
580                }
581                Ordering::Greater => {
582                    self.metrics.buffered_responses.decrement(1.);
583                    PeekMut::pop(next_response);
584                }
585            }
586        }
587    }
588
589    /// Returns the request for the `sync_target` header.
590    const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
591        HeadersRequest::falling(start, 1)
592    }
593
594    /// Starts a request future
595    fn submit_request(&self, request: HeadersRequest, priority: Priority) {
596        trace!(target: "downloaders::headers", ?request, "Submitting headers request");
597        self.in_progress_queue.push(self.request_fut(request, priority));
598        self.metrics.in_flight_requests.increment(1.);
599    }
600
601    fn request_fut(
602        &self,
603        request: HeadersRequest,
604        priority: Priority,
605    ) -> HeadersRequestFuture<H::Output> {
606        let client = Arc::clone(&self.client);
607        HeadersRequestFuture {
608            request: Some(request.clone()),
609            fut: client.get_headers_with_priority(request, priority),
610        }
611    }
612
613    /// Validate whether the header is valid in relation to it's parent
614    fn validate(
615        &self,
616        header: &SealedHeader<H::Header>,
617        parent: &SealedHeader<H::Header>,
618    ) -> DownloadResult<()> {
619        validate_header_download(&self.consensus, header, parent)
620    }
621
622    /// Clears all requests/responses.
623    fn clear(&mut self) {
624        self.lowest_validated_header.take();
625        self.queued_validated_headers = Vec::new();
626        self.buffered_responses = BinaryHeap::new();
627        self.in_progress_queue.clear();
628
629        self.metrics.in_flight_requests.set(0.);
630        self.metrics.buffered_responses.set(0.);
631    }
632
633    /// Splits off the next batch of headers
634    fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
635        let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
636        let mut rem = self.queued_validated_headers.split_off(batch_size);
637        std::mem::swap(&mut rem, &mut self.queued_validated_headers);
638        // If the downloader consumer does not flush headers at the same rate that the downloader
639        // queues them, then the `queued_validated_headers` buffer can grow unbounded.
640        //
641        // The semantics of `split_off` state that the capacity of the original buffer is
642        // unchanged, so queued_validated_headers will then have only `batch_size` elements, and
643        // its original capacity. Because `rem` is initially populated with elements `[batch_size,
644        // len)` of `queued_validated_headers`, it will have a capacity of at least `len -
645        // batch_size`, and the total memory allocated by the two buffers will be around double the
646        // original size of `queued_validated_headers`.
647        //
648        // These are then mem::swapped, leaving `rem` with a large capacity, but small length.
649        //
650        // To prevent these allocations from leaking to the consumer, we shrink the capacity of the
651        // new buffer. The total memory allocated should then be not much more than the original
652        // size of `queued_validated_headers`.
653        rem.shrink_to_fit();
654        rem
655    }
656}
657
658impl<H> ReverseHeadersDownloader<H>
659where
660    H: HeadersClient,
661    Self: HeaderDownloader + 'static,
662{
663    /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given [`Runtime`].
664    pub fn into_task_with(
665        self,
666        runtime: &Runtime,
667    ) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
668        TaskDownloader::spawn_with(self, runtime)
669    }
670}
671
672impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
673where
674    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
675{
676    type Header = H::Header;
677
678    fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
679        // ensure we're only yielding headers that are in range and follow the current local head.
680        while self
681            .queued_validated_headers
682            .last()
683            .is_some_and(|last| last.number() <= head.number())
684        {
685            // headers are sorted high to low
686            self.queued_validated_headers.pop();
687        }
688        trace!(
689            target: "downloaders::headers",
690            head=?head.num_hash(),
691            "Updating local head"
692        );
693        // update the local head
694        self.local_head = Some(head);
695    }
696
697    /// If the given target is different from the current target, we need to update the sync target
698    fn update_sync_target(&mut self, target: SyncTarget) {
699        let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
700        trace!(
701            target: "downloaders::headers",
702            sync_target=?target,
703            current_tip=?current_tip,
704            "Updating sync target"
705        );
706        match target {
707            SyncTarget::Tip(tip) => {
708                if Some(tip) != current_tip {
709                    trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
710                    let new_sync_target = SyncTargetBlock::from_hash(tip);
711
712                    // if the new sync target is the next queued request we don't need to re-start
713                    // the target update
714                    if let Some(target_number) = self
715                        .queued_validated_headers
716                        .first()
717                        .filter(|h| h.hash() == tip)
718                        .map(|h| h.number())
719                    {
720                        self.sync_target = Some(new_sync_target.with_number(target_number));
721                        return
722                    }
723
724                    trace!(target: "downloaders::headers", new=?target, "Request new sync target");
725                    self.metrics.out_of_order_requests.increment(1);
726                    self.sync_target = Some(new_sync_target);
727                    self.sync_target_request = Some(
728                        self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
729                    );
730                }
731            }
732            SyncTarget::Gap(existing) => {
733                let target = existing.parent;
734                if Some(target) != current_tip {
735                    // there could be a sync target request in progress
736                    self.sync_target_request.take();
737                    // If the target has changed, update the request pointers based on the new
738                    // targeted block number
739                    let parent_block_number = existing.block.number.saturating_sub(1);
740
741                    trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
742
743                    // Update the sync target hash
744                    self.sync_target = match self.sync_target.take() {
745                        Some(sync_target) => Some(sync_target.with_hash(target)),
746                        None => Some(SyncTargetBlock::from_hash(target)),
747                    };
748                    self.on_block_number_update(parent_block_number, parent_block_number);
749                }
750            }
751            SyncTarget::TipNum(num) => {
752                let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
753                if Some(num) != current_tip_num {
754                    trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
755                    // just update the sync target
756                    self.sync_target = Some(SyncTargetBlock::from_number(num));
757                    self.sync_target_request = Some(
758                        self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
759                    );
760                }
761            }
762        }
763    }
764
765    fn set_batch_size(&mut self, batch_size: usize) {
766        self.stream_batch_size = batch_size;
767    }
768}
769
770impl<H> Stream for ReverseHeadersDownloader<H>
771where
772    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
773{
774    type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
775
776    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
777        let this = self.get_mut();
778
779        // The downloader boundaries (local head and sync target) have to be set in order
780        // to start downloading data.
781        if this.local_head.is_none() || this.sync_target.is_none() {
782            trace!(
783                target: "downloaders::headers",
784                head=?this.local_block_number(),
785                sync_target=?this.sync_target,
786                "The downloader sync boundaries have not been set"
787            );
788            return Poll::Pending
789        }
790
791        // If we have a new tip request we need to complete that first before we send batched
792        // requests
793        while let Some(mut req) = this.sync_target_request.take() {
794            match req.poll_unpin(cx) {
795                Poll::Ready(outcome) => {
796                    match this.on_sync_target_outcome(outcome) {
797                        Ok(()) => break,
798                        Err(ReverseHeadersDownloaderError::Response(error)) => {
799                            trace!(target: "downloaders::headers", %error, "invalid sync target response");
800                            if error.is_channel_closed() {
801                                // download channel closed which means the network was dropped
802                                return Poll::Ready(None)
803                            }
804
805                            this.penalize_peer(error.peer_id, &error.error);
806                            this.metrics.increment_errors(&error.error);
807                            this.sync_target_request =
808                                Some(this.request_fut(error.request, Priority::High));
809                        }
810                        Err(ReverseHeadersDownloaderError::Downloader(error)) => {
811                            this.clear();
812                            return Poll::Ready(Some(Err(error)))
813                        }
814                    };
815                }
816                Poll::Pending => {
817                    this.sync_target_request = Some(req);
818                    return Poll::Pending
819                }
820            }
821        }
822
823        // shrink the buffer after handling sync target outcomes
824        this.buffered_responses.shrink_to_fit();
825
826        // this loop will submit new requests and poll them, if a new batch is ready it is returned
827        // The actual work is done by the receiver of the request channel, this means, polling the
828        // request future is just reading from a `oneshot::Receiver`. Hence, this loop tries to keep
829        // the downloader at capacity at all times The order of loops is as follows:
830        // 1. poll futures to make room for followup requests (this will also prepare validated
831        // headers for 3.) 2. exhaust all capacity by sending requests
832        // 3. return batch, if enough validated
833        // 4. return Pending if 2.) did not submit a new request, else continue
834        loop {
835            // poll requests
836            while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
837                this.metrics.in_flight_requests.decrement(1.);
838                // handle response
839                match this.on_headers_outcome(outcome) {
840                    Ok(()) => (),
841                    Err(ReverseHeadersDownloaderError::Response(error)) => {
842                        if error.is_channel_closed() {
843                            // download channel closed which means the network was dropped
844                            return Poll::Ready(None)
845                        }
846                        this.on_headers_error(error);
847                    }
848                    Err(ReverseHeadersDownloaderError::Downloader(error)) => {
849                        this.clear();
850                        return Poll::Ready(Some(Err(error)))
851                    }
852                };
853            }
854
855            // shrink the buffer after handling headers outcomes
856            this.buffered_responses.shrink_to_fit();
857
858            // marks the loop's exit condition: exit if no requests submitted
859            let mut progress = false;
860
861            let concurrent_request_limit = this.concurrent_request_limit();
862            // populate requests
863            while this.in_progress_queue.len() < concurrent_request_limit &&
864                this.buffered_responses.len() < this.max_buffered_responses
865            {
866                if let Some(request) = this.next_request() {
867                    trace!(
868                        target: "downloaders::headers",
869                        "Requesting headers {request:?}"
870                    );
871                    progress = true;
872                    this.submit_request(request, Priority::Normal);
873                } else {
874                    // no more requests
875                    break
876                }
877            }
878
879            // yield next batch
880            if this.queued_validated_headers.len() >= this.stream_batch_size {
881                let next_batch = this.split_next_batch();
882
883                // Note: if this would drain all headers, we need to keep the lowest (last index)
884                // around so we can continue validating headers responses.
885                if this.queued_validated_headers.is_empty() {
886                    this.lowest_validated_header = next_batch.last().cloned();
887                }
888
889                trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
890
891                this.metrics.total_flushed.increment(next_batch.len() as u64);
892                return Poll::Ready(Some(Ok(next_batch)))
893            }
894
895            if !progress {
896                break
897            }
898        }
899
900        // all requests are handled, stream is finished
901        if this.in_progress_queue.is_empty() {
902            let next_batch = this.split_next_batch();
903            if next_batch.is_empty() {
904                this.clear();
905                return Poll::Ready(None)
906            }
907            this.metrics.total_flushed.increment(next_batch.len() as u64);
908            return Poll::Ready(Some(Ok(next_batch)))
909        }
910
911        Poll::Pending
912    }
913}
914
915/// A future that returns a list of headers on success.
916#[derive(Debug)]
917struct HeadersRequestFuture<F> {
918    request: Option<HeadersRequest>,
919    fut: F,
920}
921
922impl<F, H> Future for HeadersRequestFuture<F>
923where
924    F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
925{
926    type Output = HeadersRequestOutcome<H>;
927
928    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
929        let this = self.get_mut();
930        let outcome = ready!(this.fut.poll_unpin(cx));
931        let request = this.request.take().unwrap();
932
933        Poll::Ready(HeadersRequestOutcome { request, outcome })
934    }
935}
936
937/// The outcome of the [`HeadersRequestFuture`]
938struct HeadersRequestOutcome<H> {
939    request: HeadersRequest,
940    outcome: PeerRequestResult<Vec<H>>,
941}
942
943// === impl OrderedHeadersResponse ===
944
945impl<H> HeadersRequestOutcome<H> {
946    const fn block_number(&self) -> u64 {
947        self.request.start.as_number().expect("is number")
948    }
949}
950
951/// Wrapper type to order responses
952#[derive(Debug)]
953struct OrderedHeadersResponse<H> {
954    headers: Vec<H>,
955    request: HeadersRequest,
956    peer_id: PeerId,
957}
958
959// === impl OrderedHeadersResponse ===
960
961impl<H> OrderedHeadersResponse<H> {
962    const fn block_number(&self) -> u64 {
963        self.request.start.as_number().expect("is number")
964    }
965}
966
967impl<H> PartialEq for OrderedHeadersResponse<H> {
968    fn eq(&self, other: &Self) -> bool {
969        self.block_number() == other.block_number()
970    }
971}
972
973impl<H> Eq for OrderedHeadersResponse<H> {}
974
975impl<H> PartialOrd for OrderedHeadersResponse<H> {
976    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
977        Some(self.cmp(other))
978    }
979}
980
981impl<H> Ord for OrderedHeadersResponse<H> {
982    fn cmp(&self, other: &Self) -> Ordering {
983        self.block_number().cmp(&other.block_number())
984    }
985}
986
987/// Type returned if a bad response was processed
988#[derive(Debug, Error)]
989#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
990struct HeadersResponseError {
991    request: HeadersRequest,
992    peer_id: Option<PeerId>,
993    #[source]
994    error: DownloadError,
995}
996
997impl HeadersResponseError {
998    /// Returns true if the error was caused by a closed channel to the network.
999    const fn is_channel_closed(&self) -> bool {
1000        if let DownloadError::RequestError(ref err) = self.error {
1001            return err.is_channel_closed()
1002        }
1003        false
1004    }
1005}
1006
1007/// The block to which we want to close the gap: (local head...sync target]
1008/// This tracks the sync target block, so this could be either a block number or hash.
1009#[derive(Clone, Debug)]
1010pub enum SyncTargetBlock {
1011    /// Block hash of the targeted block
1012    Hash(B256),
1013    /// Block number of the targeted block
1014    Number(u64),
1015    /// Both the block hash and number of the targeted block
1016    HashAndNumber {
1017        /// Block hash of the targeted block
1018        hash: B256,
1019        /// Block number of the targeted block
1020        number: u64,
1021    },
1022}
1023
1024impl SyncTargetBlock {
1025    /// Create new instance from hash.
1026    const fn from_hash(hash: B256) -> Self {
1027        Self::Hash(hash)
1028    }
1029
1030    /// Create new instance from number.
1031    const fn from_number(num: u64) -> Self {
1032        Self::Number(num)
1033    }
1034
1035    /// Set the hash for the sync target.
1036    const fn with_hash(self, hash: B256) -> Self {
1037        match self {
1038            Self::Hash(_) => Self::Hash(hash),
1039            Self::Number(number) | Self::HashAndNumber { number, .. } => {
1040                Self::HashAndNumber { hash, number }
1041            }
1042        }
1043    }
1044
1045    /// Set a number on the instance.
1046    const fn with_number(self, number: u64) -> Self {
1047        match self {
1048            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1049                Self::HashAndNumber { hash, number }
1050            }
1051            Self::Number(_) => Self::Number(number),
1052        }
1053    }
1054
1055    /// Replace the target block number, and return the old block number, if it was set.
1056    ///
1057    /// If the target block is a hash, this be converted into a `HashAndNumber`, but return `None`.
1058    /// The semantics should be equivalent to that of `Option::replace`.
1059    const fn replace_number(&mut self, number: u64) -> Option<u64> {
1060        match self {
1061            Self::Hash(hash) => {
1062                *self = Self::HashAndNumber { hash: *hash, number };
1063                None
1064            }
1065            Self::Number(old_number) => {
1066                let res = Some(*old_number);
1067                *self = Self::Number(number);
1068                res
1069            }
1070            Self::HashAndNumber { number: old_number, hash } => {
1071                let res = Some(*old_number);
1072                *self = Self::HashAndNumber { hash: *hash, number };
1073                res
1074            }
1075        }
1076    }
1077
1078    /// Return the hash of the target block, if it is set.
1079    const fn hash(&self) -> Option<B256> {
1080        match self {
1081            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1082            Self::Number(_) => None,
1083        }
1084    }
1085
1086    /// Return the block number of the sync target, if it is set.
1087    const fn number(&self) -> Option<u64> {
1088        match self {
1089            Self::Hash(_) => None,
1090            Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1091        }
1092    }
1093}
1094
1095/// The builder for [`ReverseHeadersDownloader`] with
1096/// some default settings
1097#[derive(Debug)]
1098pub struct ReverseHeadersDownloaderBuilder {
1099    /// The batch size per one request
1100    request_limit: u64,
1101    /// Batch size for headers
1102    stream_batch_size: usize,
1103    /// Batch size for headers
1104    min_concurrent_requests: usize,
1105    /// Batch size for headers
1106    max_concurrent_requests: usize,
1107    /// How many responses to buffer
1108    max_buffered_responses: usize,
1109}
1110
1111impl ReverseHeadersDownloaderBuilder {
1112    /// Creates a new [`ReverseHeadersDownloaderBuilder`] with configurations based on the provided
1113    /// [`HeadersConfig`].
1114    pub fn new(config: HeadersConfig) -> Self {
1115        Self::default()
1116            .request_limit(config.downloader_request_limit)
1117            .min_concurrent_requests(config.downloader_min_concurrent_requests)
1118            .max_concurrent_requests(config.downloader_max_concurrent_requests)
1119            .max_buffered_responses(config.downloader_max_buffered_responses)
1120            .stream_batch_size(config.commit_threshold as usize)
1121    }
1122}
1123
1124impl Default for ReverseHeadersDownloaderBuilder {
1125    fn default() -> Self {
1126        Self {
1127            stream_batch_size: 10_000,
1128            // This is just below the max number of headers commonly in a headers response (1024), see also <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L38-L40>
1129            // with ~500bytes per header this around 0.5MB per request max
1130            request_limit: 1_000,
1131            max_concurrent_requests: 100,
1132            min_concurrent_requests: 5,
1133            max_buffered_responses: 100,
1134        }
1135    }
1136}
1137
1138impl ReverseHeadersDownloaderBuilder {
1139    /// Set the request batch size.
1140    ///
1141    /// This determines the `limit` for a `GetBlockHeaders` requests, the number of headers we ask
1142    /// for.
1143    pub const fn request_limit(mut self, limit: u64) -> Self {
1144        self.request_limit = limit;
1145        self
1146    }
1147
1148    /// Set the stream batch size
1149    ///
1150    /// This determines the number of headers the [`ReverseHeadersDownloader`] will yield on
1151    /// `Stream::next`. This will be the amount of headers the headers stage will commit at a
1152    /// time.
1153    pub const fn stream_batch_size(mut self, size: usize) -> Self {
1154        self.stream_batch_size = size;
1155        self
1156    }
1157
1158    /// Set the min amount of concurrent requests.
1159    ///
1160    /// If there's capacity the [`ReverseHeadersDownloader`] will keep at least this many requests
1161    /// active at a time.
1162    pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1163        self.min_concurrent_requests = min_concurrent_requests;
1164        self
1165    }
1166
1167    /// Set the max amount of concurrent requests.
1168    ///
1169    /// The downloader's concurrent requests won't exceed the given amount.
1170    pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1171        self.max_concurrent_requests = max_concurrent_requests;
1172        self
1173    }
1174
1175    /// How many responses to buffer internally.
1176    ///
1177    /// This essentially determines how much memory the downloader can use for buffering responses
1178    /// that arrive out of order. The total number of buffered headers is `request_limit *
1179    /// max_buffered_responses`. If the [`ReverseHeadersDownloader`]'s buffered responses exceeds
1180    /// this threshold it waits until there's capacity again before sending new requests.
1181    pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1182        self.max_buffered_responses = max_buffered_responses;
1183        self
1184    }
1185
1186    /// Build [`ReverseHeadersDownloader`] with provided consensus
1187    /// and header client implementations
1188    pub fn build<H>(
1189        self,
1190        client: H,
1191        consensus: Arc<dyn HeaderValidator<H::Header>>,
1192    ) -> ReverseHeadersDownloader<H>
1193    where
1194        H: HeadersClient + 'static,
1195    {
1196        let Self {
1197            request_limit,
1198            stream_batch_size,
1199            min_concurrent_requests,
1200            max_concurrent_requests,
1201            max_buffered_responses,
1202        } = self;
1203        ReverseHeadersDownloader {
1204            consensus,
1205            client: Arc::new(client),
1206            local_head: None,
1207            sync_target: None,
1208            // Note: we set these to `0` first, they'll be updated once the sync target response is
1209            // handled and only used afterwards
1210            next_request_block_number: 0,
1211            next_chain_tip_block_number: 0,
1212            lowest_validated_header: None,
1213            request_limit,
1214            min_concurrent_requests,
1215            max_concurrent_requests,
1216            stream_batch_size,
1217            max_buffered_responses,
1218            sync_target_request: None,
1219            in_progress_queue: Default::default(),
1220            buffered_responses: Default::default(),
1221            queued_validated_headers: Default::default(),
1222            metrics: Default::default(),
1223        }
1224    }
1225}
1226
1227/// Configures and returns the next [`HeadersRequest`] based on the given parameters
1228///
1229/// The request will start at the given `next_request_block_number` block.
1230/// The `limit` of the request will either be the targeted `request_limit` or the difference of
1231/// `next_request_block_number` and the `local_head` in case this is smaller than the targeted
1232/// `request_limit`.
1233#[inline]
1234fn calc_next_request(
1235    local_head: u64,
1236    next_request_block_number: u64,
1237    request_limit: u64,
1238) -> HeadersRequest {
1239    // downloading is in reverse
1240    let diff = next_request_block_number - local_head;
1241    let limit = diff.min(request_limit);
1242    let start = next_request_block_number;
1243    HeadersRequest::falling(start.into(), limit)
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248    use super::*;
1249    use crate::headers::test_utils::child_header;
1250    use alloy_consensus::Header;
1251    use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1252    use assert_matches::assert_matches;
1253    use reth_consensus::test_utils::TestConsensus;
1254    use reth_network_p2p::test_utils::TestHeadersClient;
1255
1256    /// Tests that `replace_number` works the same way as `Option::replace`
1257    #[test]
1258    fn test_replace_number_semantics() {
1259        struct Fixture {
1260            // input fields (both SyncTargetBlock and Option<u64>)
1261            sync_target_block: SyncTargetBlock,
1262            sync_target_option: Option<u64>,
1263
1264            // option to replace
1265            replace_number: u64,
1266
1267            // expected method result
1268            expected_result: Option<u64>,
1269
1270            // output state
1271            new_number: u64,
1272        }
1273
1274        let fixtures = vec![
1275            Fixture {
1276                sync_target_block: SyncTargetBlock::Hash(B256::random()),
1277                // Hash maps to None here, all other variants map to Some
1278                sync_target_option: None,
1279                replace_number: 1,
1280                expected_result: None,
1281                new_number: 1,
1282            },
1283            Fixture {
1284                sync_target_block: SyncTargetBlock::Number(1),
1285                sync_target_option: Some(1),
1286                replace_number: 2,
1287                expected_result: Some(1),
1288                new_number: 2,
1289            },
1290            Fixture {
1291                sync_target_block: SyncTargetBlock::HashAndNumber {
1292                    hash: B256::random(),
1293                    number: 1,
1294                },
1295                sync_target_option: Some(1),
1296                replace_number: 2,
1297                expected_result: Some(1),
1298                new_number: 2,
1299            },
1300        ];
1301
1302        for fixture in fixtures {
1303            let mut sync_target_block = fixture.sync_target_block;
1304            let result = sync_target_block.replace_number(fixture.replace_number);
1305            assert_eq!(result, fixture.expected_result);
1306            assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1307
1308            let mut sync_target_option = fixture.sync_target_option;
1309            let option_result = sync_target_option.replace(fixture.replace_number);
1310            assert_eq!(option_result, fixture.expected_result);
1311            assert_eq!(sync_target_option, Some(fixture.new_number));
1312        }
1313    }
1314
1315    /// Tests that request calc works
1316    #[test]
1317    fn test_sync_target_update() {
1318        let client = Arc::new(TestHeadersClient::default());
1319
1320        let genesis = SealedHeader::default();
1321
1322        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1323            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1324        downloader.update_local_head(genesis);
1325        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1326
1327        downloader.sync_target_request.take();
1328
1329        let target = SyncTarget::Tip(B256::random());
1330        downloader.update_sync_target(target);
1331        assert!(downloader.sync_target_request.is_some());
1332
1333        downloader.sync_target_request.take();
1334        let target = SyncTarget::Gap(BlockWithParent {
1335            block: BlockNumHash::new(0, B256::random()),
1336            parent: Default::default(),
1337        });
1338        downloader.update_sync_target(target);
1339        assert!(downloader.sync_target_request.is_none());
1340        assert_matches!(
1341            downloader.sync_target,
1342            Some(target) => target.number().is_some()
1343        );
1344    }
1345
1346    /// Tests that request calc works
1347    #[test]
1348    fn test_head_update() {
1349        let client = Arc::new(TestHeadersClient::default());
1350
1351        let header: SealedHeader = SealedHeader::default();
1352
1353        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1354            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1355        downloader.update_local_head(header.clone());
1356        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1357
1358        downloader.queued_validated_headers.push(header.clone());
1359        let mut next = header.as_ref().clone();
1360        next.number += 1;
1361        downloader.update_local_head(SealedHeader::new(next, B256::random()));
1362        assert!(downloader.queued_validated_headers.is_empty());
1363    }
1364
1365    #[test]
1366    fn test_request_calc() {
1367        // request an entire batch
1368        let local = 0;
1369        let next = 1000;
1370        let batch_size = 2;
1371        let request = calc_next_request(local, next, batch_size);
1372        assert_eq!(request.start, next.into());
1373        assert_eq!(request.limit, batch_size);
1374
1375        // only request 1
1376        let local = 999;
1377        let next = 1000;
1378        let batch_size = 2;
1379        let request = calc_next_request(local, next, batch_size);
1380        assert_eq!(request.start, next.into());
1381        assert_eq!(request.limit, 1);
1382    }
1383
1384    /// Tests that request calc works
1385    #[test]
1386    fn test_next_request() {
1387        let client = Arc::new(TestHeadersClient::default());
1388
1389        let genesis = SealedHeader::default();
1390
1391        let batch_size = 99;
1392        let start = 1000;
1393        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1394            .request_limit(batch_size)
1395            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1396        downloader.update_local_head(genesis);
1397        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1398
1399        downloader.next_request_block_number = start;
1400
1401        let mut total = 0;
1402        while let Some(req) = downloader.next_request() {
1403            assert_eq!(req.start, (start - total).into());
1404            total += req.limit;
1405        }
1406        assert_eq!(total, start);
1407        assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1408    }
1409
1410    #[test]
1411    fn test_resp_order() {
1412        let mut heap = BinaryHeap::new();
1413        let hi = 1u64;
1414        heap.push(OrderedHeadersResponse::<Header> {
1415            headers: vec![],
1416            request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1417            peer_id: Default::default(),
1418        });
1419
1420        let lo = 0u64;
1421        heap.push(OrderedHeadersResponse {
1422            headers: vec![],
1423            request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1424            peer_id: Default::default(),
1425        });
1426
1427        assert_eq!(heap.pop().unwrap().block_number(), hi);
1428        assert_eq!(heap.pop().unwrap().block_number(), lo);
1429    }
1430
1431    #[tokio::test]
1432    async fn download_at_fork_head() {
1433        reth_tracing::init_test_tracing();
1434
1435        let client = Arc::new(TestHeadersClient::default());
1436
1437        let p3 = SealedHeader::default();
1438        let p2 = child_header(&p3);
1439        let p1 = child_header(&p2);
1440        let p0 = child_header(&p1);
1441
1442        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1443            .stream_batch_size(3)
1444            .request_limit(3)
1445            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1446        downloader.update_local_head(p3.clone());
1447        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1448
1449        client
1450            .extend(vec![
1451                p0.as_ref().clone(),
1452                p1.as_ref().clone(),
1453                p2.as_ref().clone(),
1454                p3.as_ref().clone(),
1455            ])
1456            .await;
1457
1458        let headers = downloader.next().await.unwrap();
1459        assert_eq!(headers.unwrap(), vec![p0, p1, p2,]);
1460        assert!(downloader.buffered_responses.is_empty());
1461        assert!(downloader.next().await.is_none());
1462        assert!(downloader.next().await.is_none());
1463    }
1464
1465    #[tokio::test]
1466    async fn download_one_by_one() {
1467        reth_tracing::init_test_tracing();
1468        let p3 = SealedHeader::default();
1469        let p2 = child_header(&p3);
1470        let p1 = child_header(&p2);
1471        let p0 = child_header(&p1);
1472
1473        let client = Arc::new(TestHeadersClient::default());
1474        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1475            .stream_batch_size(1)
1476            .request_limit(1)
1477            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1478        downloader.update_local_head(p3.clone());
1479        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1480
1481        client
1482            .extend(vec![
1483                p0.as_ref().clone(),
1484                p1.as_ref().clone(),
1485                p2.as_ref().clone(),
1486                p3.as_ref().clone(),
1487            ])
1488            .await;
1489
1490        let headers = downloader.next().await.unwrap();
1491        let headers = headers.unwrap();
1492        assert_eq!(headers, vec![p0]);
1493        assert_eq!(headers.capacity(), headers.len());
1494
1495        let headers = downloader.next().await.unwrap();
1496        let headers = headers.unwrap();
1497        assert_eq!(headers, vec![p1]);
1498        assert_eq!(headers.capacity(), headers.len());
1499
1500        let headers = downloader.next().await.unwrap();
1501        let headers = headers.unwrap();
1502        assert_eq!(headers, vec![p2]);
1503        assert_eq!(headers.capacity(), headers.len());
1504
1505        assert!(downloader.next().await.is_none());
1506    }
1507
1508    #[tokio::test]
1509    async fn download_one_by_one_larger_request_limit() {
1510        reth_tracing::init_test_tracing();
1511        let p3 = SealedHeader::default();
1512        let p2 = child_header(&p3);
1513        let p1 = child_header(&p2);
1514        let p0 = child_header(&p1);
1515
1516        let client = Arc::new(TestHeadersClient::default());
1517        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1518            .stream_batch_size(1)
1519            .request_limit(3)
1520            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1521        downloader.update_local_head(p3.clone());
1522        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1523
1524        client
1525            .extend(vec![
1526                p0.as_ref().clone(),
1527                p1.as_ref().clone(),
1528                p2.as_ref().clone(),
1529                p3.as_ref().clone(),
1530            ])
1531            .await;
1532
1533        let headers = downloader.next().await.unwrap();
1534        let headers = headers.unwrap();
1535        assert_eq!(headers, vec![p0]);
1536        assert_eq!(headers.capacity(), headers.len());
1537
1538        let headers = downloader.next().await.unwrap();
1539        let headers = headers.unwrap();
1540        assert_eq!(headers, vec![p1]);
1541        assert_eq!(headers.capacity(), headers.len());
1542
1543        let headers = downloader.next().await.unwrap();
1544        let headers = headers.unwrap();
1545        assert_eq!(headers, vec![p2]);
1546        assert_eq!(headers.capacity(), headers.len());
1547
1548        assert!(downloader.next().await.is_none());
1549    }
1550}