reth_downloaders/headers/
reverse_headers.rs

1//! A headers downloader that can handle multiple requests concurrently.
2
3use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, Sealable, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14    error::{DownloadError, DownloadResult, PeerRequestResult},
15    headers::{
16        client::{HeadersClient, HeadersRequest},
17        downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18        error::{HeadersDownloaderError, HeadersDownloaderResult},
19    },
20    priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives_traits::{GotExpected, SealedHeader};
24use reth_tasks::{TaskSpawner, TokioTaskExecutor};
25use std::{
26    cmp::{Ordering, Reverse},
27    collections::{binary_heap::PeekMut, BinaryHeap},
28    future::Future,
29    pin::Pin,
30    sync::Arc,
31    task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{debug, error, trace};
35
36/// A heuristic that is used to determine the number of requests that should be prepared for a peer.
37/// This should ensure that there are always requests lined up for peers to handle while the
38/// downloader is yielding a next batch of headers that is being committed to the database.
39const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41/// Wrapper for internal downloader errors.
42#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H: Sealable> {
44    #[error(transparent)]
45    Downloader(#[from] HeadersDownloaderError<H>),
46    #[error(transparent)]
47    Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H: Sealable> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51    fn from(value: HeadersResponseError) -> Self {
52        Self::Response(Box::new(value))
53    }
54}
55
56/// Downloads headers concurrently.
57///
58/// This [`HeaderDownloader`] downloads headers using the configured [`HeadersClient`].
59/// Headers can be requested by hash or block number and take a `limit` parameter. This downloader
60/// tries to fill the gap between the local head of the node and the chain tip by issuing multiple
61/// requests at a time but yielding them in batches on [`Stream::poll_next`].
62///
63/// **Note:** This downloader downloads in reverse, see also
64/// [`reth_network_p2p::headers::client::HeadersDirection`], this means the batches of headers that
65/// this downloader yields will start at the chain tip and move towards the local head: falling
66/// block numbers.
67#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70    /// Consensus client used to validate headers
71    consensus: Arc<dyn HeaderValidator<H::Header>>,
72    /// Client used to download headers.
73    client: Arc<H>,
74    /// The local head of the chain.
75    local_head: Option<SealedHeader<H::Header>>,
76    /// Block we want to close the gap to.
77    sync_target: Option<SyncTargetBlock>,
78    /// The block number to use for requests.
79    next_request_block_number: u64,
80    /// Keeps track of the block we need to validate next.
81    lowest_validated_header: Option<SealedHeader<H::Header>>,
82    /// Tip block number to start validating from (in reverse)
83    next_chain_tip_block_number: u64,
84    /// The batch size per one request
85    request_limit: u64,
86    /// Minimum amount of requests to handle concurrently.
87    min_concurrent_requests: usize,
88    /// Maximum amount of requests to handle concurrently.
89    max_concurrent_requests: usize,
90    /// The number of block headers to return at once
91    stream_batch_size: usize,
92    /// Maximum amount of received headers to buffer internally.
93    max_buffered_responses: usize,
94    /// Contains the request to retrieve the headers for the sync target
95    ///
96    /// This will give us the block number of the `sync_target`, after which we can send multiple
97    /// requests at a time.
98    sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99    /// requests in progress
100    in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101    /// Buffered, unvalidated responses
102    buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103    /// Buffered, _sorted_ and validated headers ready to be returned.
104    ///
105    /// Note: headers are sorted from high to low
106    queued_validated_headers: Vec<SealedHeader<H::Header>>,
107    /// Header downloader metrics.
108    metrics: HeaderDownloaderMetrics,
109}
110
111// === impl ReverseHeadersDownloader ===
112
113impl<H> ReverseHeadersDownloader<H>
114where
115    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117    /// Convenience method to create a [`ReverseHeadersDownloaderBuilder`] without importing it
118    pub fn builder() -> ReverseHeadersDownloaderBuilder {
119        ReverseHeadersDownloaderBuilder::default()
120    }
121
122    /// Returns the block number the local node is at.
123    #[inline]
124    fn local_block_number(&self) -> Option<BlockNumber> {
125        self.local_head.as_ref().map(|h| h.number())
126    }
127
128    /// Returns the existing local head block number
129    ///
130    /// # Panics
131    ///
132    /// If the local head has not been set.
133    #[inline]
134    fn existing_local_block_number(&self) -> BlockNumber {
135        self.local_head.as_ref().expect("is initialized").number()
136    }
137
138    /// Returns the existing sync target.
139    ///
140    /// # Panics
141    ///
142    /// If the sync target has never been set.
143    #[inline]
144    fn existing_sync_target(&self) -> SyncTargetBlock {
145        self.sync_target.as_ref().expect("is initialized").clone()
146    }
147
148    /// Max requests to handle at the same time
149    ///
150    /// This depends on the number of active peers but will always be
151    /// `min_concurrent_requests..max_concurrent_requests`
152    #[inline]
153    fn concurrent_request_limit(&self) -> usize {
154        let num_peers = self.client.num_connected_peers();
155
156        // we try to keep more requests than available peers active so that there's always a
157        // followup request available for a peer
158        let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159        let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161        // If only a few peers are connected we keep it low
162        if num_peers < self.min_concurrent_requests {
163            return max_dynamic
164        }
165
166        max_dynamic.min(self.max_concurrent_requests)
167    }
168
169    /// Returns the next header request
170    ///
171    /// This will advance the current block towards the local head.
172    ///
173    /// Returns `None` if no more requests are required.
174    fn next_request(&mut self) -> Option<HeadersRequest> {
175        if let Some(local_head) = self.local_block_number() &&
176            self.next_request_block_number > local_head
177        {
178            let request =
179                calc_next_request(local_head, self.next_request_block_number, self.request_limit);
180            // need to shift the tracked request block number based on the number of requested
181            // headers so follow-up requests will use that as start.
182            self.next_request_block_number -= request.limit;
183
184            return Some(request)
185        }
186
187        None
188    }
189
190    /// Returns the next header to use for validation.
191    ///
192    /// Since this downloader downloads blocks with falling block number, this will return the
193    /// lowest (in terms of block number) validated header.
194    ///
195    /// This is either the last `queued_validated_headers`, or if has been drained entirely the
196    /// `lowest_validated_header`.
197    ///
198    /// This only returns `None` if we haven't fetched the initial chain tip yet.
199    fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
200        self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
201    }
202
203    /// Resets the request trackers and clears the sync target.
204    ///
205    /// This ensures the downloader will restart after a new sync target has been set.
206    fn reset(&mut self) {
207        debug!(target: "downloaders::headers", "Resetting headers downloader");
208        self.next_request_block_number = 0;
209        self.next_chain_tip_block_number = 0;
210        self.sync_target.take();
211    }
212
213    /// Validate that the received header matches the expected sync target.
214    fn validate_sync_target(
215        &self,
216        header: &SealedHeader<H::Header>,
217        request: HeadersRequest,
218        peer_id: PeerId,
219    ) -> Result<(), Box<HeadersResponseError>> {
220        match self.existing_sync_target() {
221            SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
222                if header.hash() != hash =>
223            {
224                Err(Box::new(HeadersResponseError {
225                    request,
226                    peer_id: Some(peer_id),
227                    error: DownloadError::InvalidTip(
228                        GotExpected { got: header.hash(), expected: hash }.into(),
229                    ),
230                }))
231            }
232            SyncTargetBlock::Number(number) if header.number() != number => {
233                Err(Box::new(HeadersResponseError {
234                    request,
235                    peer_id: Some(peer_id),
236                    error: DownloadError::InvalidTipNumber(GotExpected {
237                        got: header.number(),
238                        expected: number,
239                    }),
240                }))
241            }
242            _ => Ok(()),
243        }
244    }
245
246    /// Processes the next headers in line.
247    ///
248    /// This will validate all headers and insert them into the validated buffer.
249    ///
250    /// Returns an error if the given headers are invalid.
251    ///
252    /// Caution: this expects the `headers` to be sorted with _falling_ block numbers
253    fn process_next_headers(
254        &mut self,
255        request: HeadersRequest,
256        headers: Vec<H::Header>,
257        peer_id: PeerId,
258    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
259        let mut validated = Vec::with_capacity(headers.len());
260
261        let sealed_headers =
262            headers.into_par_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>();
263        for parent in sealed_headers {
264            // Validate that the header is the parent header of the last validated header.
265            if let Some(validated_header) =
266                validated.last().or_else(|| self.lowest_validated_header())
267            {
268                if let Err(error) = self.validate(validated_header, &parent) {
269                    trace!(target: "downloaders::headers", %error ,"Failed to validate header");
270                    return Err(
271                        HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
272                    )
273                }
274            } else {
275                self.validate_sync_target(&parent, request.clone(), peer_id)?;
276            }
277
278            validated.push(parent);
279        }
280
281        // If the last (smallest) validated header attaches to the local head, validate it.
282        if let Some((last_header, head)) = validated
283            .last_mut()
284            .zip(self.local_head.as_ref())
285            .filter(|(last, head)| last.number() == head.number() + 1)
286        {
287            // Every header must be valid on its own
288            if let Err(error) = self.consensus.validate_header(&*last_header) {
289                trace!(target: "downloaders::headers", %error, "Failed to validate header");
290                return Err(HeadersResponseError {
291                    request,
292                    peer_id: Some(peer_id),
293                    error: DownloadError::HeaderValidation {
294                        hash: head.hash(),
295                        number: head.number(),
296                        error: Box::new(error),
297                    },
298                }
299                .into())
300            }
301
302            // If the header is valid on its own, but not against its parent, we return it as
303            // detached head error.
304            // In stage sync this will trigger an unwind because this means that the local head
305            // is not part of the chain the sync target is on. In other words, the downloader was
306            // unable to connect the sync target with the local head because the sync target and
307            // the local head or on different chains.
308            if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
309                let local_head = head.clone();
310                // Replace the last header with a detached variant
311                error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
312
313                // Reset trackers so that we can start over the next time the sync target is
314                // updated.
315                // The expected event flow when that happens is that the node will unwind the local
316                // chain and restart the downloader.
317                self.reset();
318
319                return Err(HeadersDownloaderError::DetachedHead {
320                    local_head: Box::new(local_head),
321                    header: Box::new(last_header.clone()),
322                    error: Box::new(error),
323                }
324                .into())
325            }
326        }
327
328        // update tracked block info (falling block number)
329        self.next_chain_tip_block_number =
330            validated.last().expect("exists").number().saturating_sub(1);
331        self.queued_validated_headers.extend(validated);
332
333        Ok(())
334    }
335
336    /// Updates the state based on the given `target_block_number`
337    ///
338    /// There are three different outcomes:
339    ///  * This is the first time this is called: current `sync_target` block is still `None`. In
340    ///    which case we're initializing the request trackers to `next_block`
341    ///  * The `target_block_number` is _higher_ than the current target. In which case we start
342    ///    over with a new range
343    ///  * The `target_block_number` is _lower_ than the current target or the _same_. In which case
344    ///    we don't need to update the request trackers but need to ensure already buffered headers
345    ///    are _not_ higher than the new `target_block_number`.
346    fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
347        // Update the trackers
348        if let Some(old_target) =
349            self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
350        {
351            if target_block_number > old_target {
352                // the new target is higher than the old target we need to update the
353                // request tracker and reset everything
354                self.next_request_block_number = next_block;
355                self.next_chain_tip_block_number = next_block;
356                self.clear();
357            } else {
358                // ensure already validated headers are in range
359                let skip = self
360                    .queued_validated_headers
361                    .iter()
362                    .take_while(|last| last.number() > target_block_number)
363                    .count();
364                // removes all headers that are higher than current target
365                self.queued_validated_headers.drain(..skip);
366            }
367        } else {
368            // this occurs on the initial sync target request
369            self.next_request_block_number = next_block;
370            self.next_chain_tip_block_number = next_block;
371        }
372    }
373
374    /// Handles the response for the request for the sync target
375    fn on_sync_target_outcome(
376        &mut self,
377        response: HeadersRequestOutcome<H::Header>,
378    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
379        let sync_target = self.existing_sync_target();
380        let HeadersRequestOutcome { request, outcome } = response;
381        match outcome {
382            Ok(res) => {
383                let (peer_id, mut headers) = res.split();
384
385                // update total downloaded metric
386                self.metrics.total_downloaded.increment(headers.len() as u64);
387
388                // sort headers from highest to lowest block number
389                headers.sort_unstable_by_key(|h| Reverse(h.number()));
390
391                if headers.is_empty() {
392                    return Err(HeadersResponseError {
393                        request,
394                        peer_id: Some(peer_id),
395                        error: DownloadError::EmptyResponse,
396                    }
397                    .into())
398                }
399
400                let header = headers.swap_remove(0);
401                let target = SealedHeader::seal_slow(header);
402
403                match sync_target {
404                    SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
405                        if target.hash() != hash {
406                            return Err(HeadersResponseError {
407                                request,
408                                peer_id: Some(peer_id),
409                                error: DownloadError::InvalidTip(
410                                    GotExpected { got: target.hash(), expected: hash }.into(),
411                                ),
412                            }
413                            .into())
414                        }
415                    }
416                    SyncTargetBlock::Number(number) => {
417                        if target.number() != number {
418                            return Err(HeadersResponseError {
419                                request,
420                                peer_id: Some(peer_id),
421                                error: DownloadError::InvalidTipNumber(GotExpected {
422                                    got: target.number(),
423                                    expected: number,
424                                }),
425                            }
426                            .into())
427                        }
428                    }
429                }
430
431                trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
432
433                // This is the next block we need to start issuing requests from
434                let parent_block_number = target.number().saturating_sub(1);
435                self.on_block_number_update(target.number(), parent_block_number);
436
437                self.queued_validated_headers.push(target);
438
439                // try to validate all buffered responses blocked by this successful response
440                self.try_validate_buffered()
441                    .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
442                    .transpose()?;
443
444                Ok(())
445            }
446            Err(err) => {
447                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
448            }
449        }
450    }
451
452    /// Invoked when we received a response
453    fn on_headers_outcome(
454        &mut self,
455        response: HeadersRequestOutcome<H::Header>,
456    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
457        let requested_block_number = response.block_number();
458        let HeadersRequestOutcome { request, outcome } = response;
459
460        match outcome {
461            Ok(res) => {
462                let (peer_id, mut headers) = res.split();
463
464                // update total downloaded metric
465                self.metrics.total_downloaded.increment(headers.len() as u64);
466
467                trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
468
469                if headers.is_empty() {
470                    return Err(HeadersResponseError {
471                        request,
472                        peer_id: Some(peer_id),
473                        error: DownloadError::EmptyResponse,
474                    }
475                    .into())
476                }
477
478                if (headers.len() as u64) != request.limit {
479                    return Err(HeadersResponseError {
480                        peer_id: Some(peer_id),
481                        error: DownloadError::HeadersResponseTooShort(GotExpected {
482                            got: headers.len() as u64,
483                            expected: request.limit,
484                        }),
485                        request,
486                    }
487                    .into())
488                }
489
490                // sort headers from highest to lowest block number
491                headers.sort_unstable_by_key(|h| Reverse(h.number()));
492
493                // validate the response
494                let highest = &headers[0];
495
496                trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
497
498                if highest.number() != requested_block_number {
499                    return Err(HeadersResponseError {
500                        request,
501                        peer_id: Some(peer_id),
502                        error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
503                            got: highest.number(),
504                            expected: requested_block_number,
505                        }),
506                    }
507                    .into())
508                }
509
510                // check if the response is the next expected
511                if highest.number() == self.next_chain_tip_block_number {
512                    // is next response, validate it
513                    self.process_next_headers(request, headers, peer_id)?;
514                    // try to validate all buffered responses blocked by this successful response
515                    self.try_validate_buffered()
516                        .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
517                        .transpose()?;
518                } else if highest.number() > self.existing_local_block_number() {
519                    self.metrics.buffered_responses.increment(1.);
520                    // can't validate yet
521                    self.buffered_responses.push(OrderedHeadersResponse {
522                        headers,
523                        request,
524                        peer_id,
525                    })
526                }
527
528                Ok(())
529            }
530            // most likely a noop, because this error
531            // would've been handled by the fetcher internally
532            Err(err) => {
533                trace!(target: "downloaders::headers", %err, "Response error");
534                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
535            }
536        }
537    }
538
539    fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
540        // Penalize the peer for bad response
541        if let Some(peer_id) = peer_id {
542            trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
543            self.client.report_bad_message(peer_id);
544        }
545    }
546
547    /// Handles the error of a bad response
548    ///
549    /// This will re-submit the request.
550    fn on_headers_error(&self, err: Box<HeadersResponseError>) {
551        let HeadersResponseError { request, peer_id, error } = *err;
552
553        self.penalize_peer(peer_id, &error);
554
555        // Update error metric
556        self.metrics.increment_errors(&error);
557
558        // Re-submit the request
559        self.submit_request(request, Priority::High);
560    }
561
562    /// Attempts to validate the buffered responses
563    ///
564    /// Returns an error if the next expected response was popped, but failed validation.
565    fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
566        loop {
567            // Check to see if we've already received the next value
568            let next_response = self.buffered_responses.peek_mut()?;
569            let next_block_number = next_response.block_number();
570            match next_block_number.cmp(&self.next_chain_tip_block_number) {
571                Ordering::Less => return None,
572                Ordering::Equal => {
573                    let OrderedHeadersResponse { headers, request, peer_id } =
574                        PeekMut::pop(next_response);
575                    self.metrics.buffered_responses.decrement(1.);
576
577                    if let Err(err) = self.process_next_headers(request, headers, peer_id) {
578                        return Some(err)
579                    }
580                }
581                Ordering::Greater => {
582                    self.metrics.buffered_responses.decrement(1.);
583                    PeekMut::pop(next_response);
584                }
585            }
586        }
587    }
588
589    /// Returns the request for the `sync_target` header.
590    const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
591        HeadersRequest::falling(start, 1)
592    }
593
594    /// Starts a request future
595    fn submit_request(&self, request: HeadersRequest, priority: Priority) {
596        trace!(target: "downloaders::headers", ?request, "Submitting headers request");
597        self.in_progress_queue.push(self.request_fut(request, priority));
598        self.metrics.in_flight_requests.increment(1.);
599    }
600
601    fn request_fut(
602        &self,
603        request: HeadersRequest,
604        priority: Priority,
605    ) -> HeadersRequestFuture<H::Output> {
606        let client = Arc::clone(&self.client);
607        HeadersRequestFuture {
608            request: Some(request.clone()),
609            fut: client.get_headers_with_priority(request, priority),
610        }
611    }
612
613    /// Validate whether the header is valid in relation to it's parent
614    fn validate(
615        &self,
616        header: &SealedHeader<H::Header>,
617        parent: &SealedHeader<H::Header>,
618    ) -> DownloadResult<()> {
619        validate_header_download(&self.consensus, header, parent)
620    }
621
622    /// Clears all requests/responses.
623    fn clear(&mut self) {
624        self.lowest_validated_header.take();
625        self.queued_validated_headers = Vec::new();
626        self.buffered_responses = BinaryHeap::new();
627        self.in_progress_queue.clear();
628
629        self.metrics.in_flight_requests.set(0.);
630        self.metrics.buffered_responses.set(0.);
631    }
632
633    /// Splits off the next batch of headers
634    fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
635        let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
636        let mut rem = self.queued_validated_headers.split_off(batch_size);
637        std::mem::swap(&mut rem, &mut self.queued_validated_headers);
638        // If the downloader consumer does not flush headers at the same rate that the downloader
639        // queues them, then the `queued_validated_headers` buffer can grow unbounded.
640        //
641        // The semantics of `split_off` state that the capacity of the original buffer is
642        // unchanged, so queued_validated_headers will then have only `batch_size` elements, and
643        // its original capacity. Because `rem` is initially populated with elements `[batch_size,
644        // len)` of `queued_validated_headers`, it will have a capacity of at least `len -
645        // batch_size`, and the total memory allocated by the two buffers will be around double the
646        // original size of `queued_validated_headers`.
647        //
648        // These are then mem::swapped, leaving `rem` with a large capacity, but small length.
649        //
650        // To prevent these allocations from leaking to the consumer, we shrink the capacity of the
651        // new buffer. The total memory allocated should then be not much more than the original
652        // size of `queued_validated_headers`.
653        rem.shrink_to_fit();
654        rem
655    }
656}
657
658impl<H> ReverseHeadersDownloader<H>
659where
660    H: HeadersClient,
661    Self: HeaderDownloader + 'static,
662{
663    /// Spawns the downloader task via [`tokio::task::spawn`]
664    pub fn into_task(self) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
665        self.into_task_with(&TokioTaskExecutor::default())
666    }
667
668    /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given `spawner`.
669    pub fn into_task_with<S>(
670        self,
671        spawner: &S,
672    ) -> TaskDownloader<<Self as HeaderDownloader>::Header>
673    where
674        S: TaskSpawner,
675    {
676        TaskDownloader::spawn_with(self, spawner)
677    }
678}
679
680impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
681where
682    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
683{
684    type Header = H::Header;
685
686    fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
687        // ensure we're only yielding headers that are in range and follow the current local head.
688        while self
689            .queued_validated_headers
690            .last()
691            .is_some_and(|last| last.number() <= head.number())
692        {
693            // headers are sorted high to low
694            self.queued_validated_headers.pop();
695        }
696        trace!(
697            target: "downloaders::headers",
698            head=?head.num_hash(),
699            "Updating local head"
700        );
701        // update the local head
702        self.local_head = Some(head);
703    }
704
705    /// If the given target is different from the current target, we need to update the sync target
706    fn update_sync_target(&mut self, target: SyncTarget) {
707        let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
708        trace!(
709            target: "downloaders::headers",
710            sync_target=?target,
711            current_tip=?current_tip,
712            "Updating sync target"
713        );
714        match target {
715            SyncTarget::Tip(tip) => {
716                if Some(tip) != current_tip {
717                    trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
718                    let new_sync_target = SyncTargetBlock::from_hash(tip);
719
720                    // if the new sync target is the next queued request we don't need to re-start
721                    // the target update
722                    if let Some(target_number) = self
723                        .queued_validated_headers
724                        .first()
725                        .filter(|h| h.hash() == tip)
726                        .map(|h| h.number())
727                    {
728                        self.sync_target = Some(new_sync_target.with_number(target_number));
729                        return
730                    }
731
732                    trace!(target: "downloaders::headers", new=?target, "Request new sync target");
733                    self.metrics.out_of_order_requests.increment(1);
734                    self.sync_target = Some(new_sync_target);
735                    self.sync_target_request = Some(
736                        self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
737                    );
738                }
739            }
740            SyncTarget::Gap(existing) => {
741                let target = existing.parent;
742                if Some(target) != current_tip {
743                    // there could be a sync target request in progress
744                    self.sync_target_request.take();
745                    // If the target has changed, update the request pointers based on the new
746                    // targeted block number
747                    let parent_block_number = existing.block.number.saturating_sub(1);
748
749                    trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
750
751                    // Update the sync target hash
752                    self.sync_target = match self.sync_target.take() {
753                        Some(sync_target) => Some(sync_target.with_hash(target)),
754                        None => Some(SyncTargetBlock::from_hash(target)),
755                    };
756                    self.on_block_number_update(parent_block_number, parent_block_number);
757                }
758            }
759            SyncTarget::TipNum(num) => {
760                let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
761                if Some(num) != current_tip_num {
762                    trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
763                    // just update the sync target
764                    self.sync_target = Some(SyncTargetBlock::from_number(num));
765                    self.sync_target_request = Some(
766                        self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
767                    );
768                }
769            }
770        }
771    }
772
773    fn set_batch_size(&mut self, batch_size: usize) {
774        self.stream_batch_size = batch_size;
775    }
776}
777
778impl<H> Stream for ReverseHeadersDownloader<H>
779where
780    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
781{
782    type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
783
784    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
785        let this = self.get_mut();
786
787        // The downloader boundaries (local head and sync target) have to be set in order
788        // to start downloading data.
789        if this.local_head.is_none() || this.sync_target.is_none() {
790            trace!(
791                target: "downloaders::headers",
792                head=?this.local_block_number(),
793                sync_target=?this.sync_target,
794                "The downloader sync boundaries have not been set"
795            );
796            return Poll::Pending
797        }
798
799        // If we have a new tip request we need to complete that first before we send batched
800        // requests
801        while let Some(mut req) = this.sync_target_request.take() {
802            match req.poll_unpin(cx) {
803                Poll::Ready(outcome) => {
804                    match this.on_sync_target_outcome(outcome) {
805                        Ok(()) => break,
806                        Err(ReverseHeadersDownloaderError::Response(error)) => {
807                            trace!(target: "downloaders::headers", %error, "invalid sync target response");
808                            if error.is_channel_closed() {
809                                // download channel closed which means the network was dropped
810                                return Poll::Ready(None)
811                            }
812
813                            this.penalize_peer(error.peer_id, &error.error);
814                            this.metrics.increment_errors(&error.error);
815                            this.sync_target_request =
816                                Some(this.request_fut(error.request, Priority::High));
817                        }
818                        Err(ReverseHeadersDownloaderError::Downloader(error)) => {
819                            this.clear();
820                            return Poll::Ready(Some(Err(error)))
821                        }
822                    };
823                }
824                Poll::Pending => {
825                    this.sync_target_request = Some(req);
826                    return Poll::Pending
827                }
828            }
829        }
830
831        // shrink the buffer after handling sync target outcomes
832        this.buffered_responses.shrink_to_fit();
833
834        // this loop will submit new requests and poll them, if a new batch is ready it is returned
835        // The actual work is done by the receiver of the request channel, this means, polling the
836        // request future is just reading from a `oneshot::Receiver`. Hence, this loop tries to keep
837        // the downloader at capacity at all times The order of loops is as follows:
838        // 1. poll futures to make room for followup requests (this will also prepare validated
839        // headers for 3.) 2. exhaust all capacity by sending requests
840        // 3. return batch, if enough validated
841        // 4. return Pending if 2.) did not submit a new request, else continue
842        loop {
843            // poll requests
844            while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
845                this.metrics.in_flight_requests.decrement(1.);
846                // handle response
847                match this.on_headers_outcome(outcome) {
848                    Ok(()) => (),
849                    Err(ReverseHeadersDownloaderError::Response(error)) => {
850                        if error.is_channel_closed() {
851                            // download channel closed which means the network was dropped
852                            return Poll::Ready(None)
853                        }
854                        this.on_headers_error(error);
855                    }
856                    Err(ReverseHeadersDownloaderError::Downloader(error)) => {
857                        this.clear();
858                        return Poll::Ready(Some(Err(error)))
859                    }
860                };
861            }
862
863            // shrink the buffer after handling headers outcomes
864            this.buffered_responses.shrink_to_fit();
865
866            // marks the loop's exit condition: exit if no requests submitted
867            let mut progress = false;
868
869            let concurrent_request_limit = this.concurrent_request_limit();
870            // populate requests
871            while this.in_progress_queue.len() < concurrent_request_limit &&
872                this.buffered_responses.len() < this.max_buffered_responses
873            {
874                if let Some(request) = this.next_request() {
875                    trace!(
876                        target: "downloaders::headers",
877                        "Requesting headers {request:?}"
878                    );
879                    progress = true;
880                    this.submit_request(request, Priority::Normal);
881                } else {
882                    // no more requests
883                    break
884                }
885            }
886
887            // yield next batch
888            if this.queued_validated_headers.len() >= this.stream_batch_size {
889                let next_batch = this.split_next_batch();
890
891                // Note: if this would drain all headers, we need to keep the lowest (last index)
892                // around so we can continue validating headers responses.
893                if this.queued_validated_headers.is_empty() {
894                    this.lowest_validated_header = next_batch.last().cloned();
895                }
896
897                trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
898
899                this.metrics.total_flushed.increment(next_batch.len() as u64);
900                return Poll::Ready(Some(Ok(next_batch)))
901            }
902
903            if !progress {
904                break
905            }
906        }
907
908        // all requests are handled, stream is finished
909        if this.in_progress_queue.is_empty() {
910            let next_batch = this.split_next_batch();
911            if next_batch.is_empty() {
912                this.clear();
913                return Poll::Ready(None)
914            }
915            this.metrics.total_flushed.increment(next_batch.len() as u64);
916            return Poll::Ready(Some(Ok(next_batch)))
917        }
918
919        Poll::Pending
920    }
921}
922
923/// A future that returns a list of headers on success.
924#[derive(Debug)]
925struct HeadersRequestFuture<F> {
926    request: Option<HeadersRequest>,
927    fut: F,
928}
929
930impl<F, H> Future for HeadersRequestFuture<F>
931where
932    F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
933{
934    type Output = HeadersRequestOutcome<H>;
935
936    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
937        let this = self.get_mut();
938        let outcome = ready!(this.fut.poll_unpin(cx));
939        let request = this.request.take().unwrap();
940
941        Poll::Ready(HeadersRequestOutcome { request, outcome })
942    }
943}
944
945/// The outcome of the [`HeadersRequestFuture`]
946struct HeadersRequestOutcome<H> {
947    request: HeadersRequest,
948    outcome: PeerRequestResult<Vec<H>>,
949}
950
951// === impl OrderedHeadersResponse ===
952
953impl<H> HeadersRequestOutcome<H> {
954    const fn block_number(&self) -> u64 {
955        self.request.start.as_number().expect("is number")
956    }
957}
958
959/// Wrapper type to order responses
960#[derive(Debug)]
961struct OrderedHeadersResponse<H> {
962    headers: Vec<H>,
963    request: HeadersRequest,
964    peer_id: PeerId,
965}
966
967// === impl OrderedHeadersResponse ===
968
969impl<H> OrderedHeadersResponse<H> {
970    const fn block_number(&self) -> u64 {
971        self.request.start.as_number().expect("is number")
972    }
973}
974
975impl<H> PartialEq for OrderedHeadersResponse<H> {
976    fn eq(&self, other: &Self) -> bool {
977        self.block_number() == other.block_number()
978    }
979}
980
981impl<H> Eq for OrderedHeadersResponse<H> {}
982
983impl<H> PartialOrd for OrderedHeadersResponse<H> {
984    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
985        Some(self.cmp(other))
986    }
987}
988
989impl<H> Ord for OrderedHeadersResponse<H> {
990    fn cmp(&self, other: &Self) -> Ordering {
991        self.block_number().cmp(&other.block_number())
992    }
993}
994
995/// Type returned if a bad response was processed
996#[derive(Debug, Error)]
997#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
998struct HeadersResponseError {
999    request: HeadersRequest,
1000    peer_id: Option<PeerId>,
1001    #[source]
1002    error: DownloadError,
1003}
1004
1005impl HeadersResponseError {
1006    /// Returns true if the error was caused by a closed channel to the network.
1007    const fn is_channel_closed(&self) -> bool {
1008        if let DownloadError::RequestError(ref err) = self.error {
1009            return err.is_channel_closed()
1010        }
1011        false
1012    }
1013}
1014
1015/// The block to which we want to close the gap: (local head...sync target]
1016/// This tracks the sync target block, so this could be either a block number or hash.
1017#[derive(Clone, Debug)]
1018pub enum SyncTargetBlock {
1019    /// Block hash of the targeted block
1020    Hash(B256),
1021    /// Block number of the targeted block
1022    Number(u64),
1023    /// Both the block hash and number of the targeted block
1024    HashAndNumber {
1025        /// Block hash of the targeted block
1026        hash: B256,
1027        /// Block number of the targeted block
1028        number: u64,
1029    },
1030}
1031
1032impl SyncTargetBlock {
1033    /// Create new instance from hash.
1034    const fn from_hash(hash: B256) -> Self {
1035        Self::Hash(hash)
1036    }
1037
1038    /// Create new instance from number.
1039    const fn from_number(num: u64) -> Self {
1040        Self::Number(num)
1041    }
1042
1043    /// Set the hash for the sync target.
1044    const fn with_hash(self, hash: B256) -> Self {
1045        match self {
1046            Self::Hash(_) => Self::Hash(hash),
1047            Self::Number(number) | Self::HashAndNumber { number, .. } => {
1048                Self::HashAndNumber { hash, number }
1049            }
1050        }
1051    }
1052
1053    /// Set a number on the instance.
1054    const fn with_number(self, number: u64) -> Self {
1055        match self {
1056            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1057                Self::HashAndNumber { hash, number }
1058            }
1059            Self::Number(_) => Self::Number(number),
1060        }
1061    }
1062
1063    /// Replace the target block number, and return the old block number, if it was set.
1064    ///
1065    /// If the target block is a hash, this be converted into a `HashAndNumber`, but return `None`.
1066    /// The semantics should be equivalent to that of `Option::replace`.
1067    const fn replace_number(&mut self, number: u64) -> Option<u64> {
1068        match self {
1069            Self::Hash(hash) => {
1070                *self = Self::HashAndNumber { hash: *hash, number };
1071                None
1072            }
1073            Self::Number(old_number) => {
1074                let res = Some(*old_number);
1075                *self = Self::Number(number);
1076                res
1077            }
1078            Self::HashAndNumber { number: old_number, hash } => {
1079                let res = Some(*old_number);
1080                *self = Self::HashAndNumber { hash: *hash, number };
1081                res
1082            }
1083        }
1084    }
1085
1086    /// Return the hash of the target block, if it is set.
1087    const fn hash(&self) -> Option<B256> {
1088        match self {
1089            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1090            Self::Number(_) => None,
1091        }
1092    }
1093
1094    /// Return the block number of the sync target, if it is set.
1095    const fn number(&self) -> Option<u64> {
1096        match self {
1097            Self::Hash(_) => None,
1098            Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1099        }
1100    }
1101}
1102
1103/// The builder for [`ReverseHeadersDownloader`] with
1104/// some default settings
1105#[derive(Debug)]
1106pub struct ReverseHeadersDownloaderBuilder {
1107    /// The batch size per one request
1108    request_limit: u64,
1109    /// Batch size for headers
1110    stream_batch_size: usize,
1111    /// Batch size for headers
1112    min_concurrent_requests: usize,
1113    /// Batch size for headers
1114    max_concurrent_requests: usize,
1115    /// How many responses to buffer
1116    max_buffered_responses: usize,
1117}
1118
1119impl ReverseHeadersDownloaderBuilder {
1120    /// Creates a new [`ReverseHeadersDownloaderBuilder`] with configurations based on the provided
1121    /// [`HeadersConfig`].
1122    pub fn new(config: HeadersConfig) -> Self {
1123        Self::default()
1124            .request_limit(config.downloader_request_limit)
1125            .min_concurrent_requests(config.downloader_min_concurrent_requests)
1126            .max_concurrent_requests(config.downloader_max_concurrent_requests)
1127            .max_buffered_responses(config.downloader_max_buffered_responses)
1128            .stream_batch_size(config.commit_threshold as usize)
1129    }
1130}
1131
1132impl Default for ReverseHeadersDownloaderBuilder {
1133    fn default() -> Self {
1134        Self {
1135            stream_batch_size: 10_000,
1136            // This is just below the max number of headers commonly in a headers response (1024), see also <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L38-L40>
1137            // with ~500bytes per header this around 0.5MB per request max
1138            request_limit: 1_000,
1139            max_concurrent_requests: 100,
1140            min_concurrent_requests: 5,
1141            max_buffered_responses: 100,
1142        }
1143    }
1144}
1145
1146impl ReverseHeadersDownloaderBuilder {
1147    /// Set the request batch size.
1148    ///
1149    /// This determines the `limit` for a `GetBlockHeaders` requests, the number of headers we ask
1150    /// for.
1151    pub const fn request_limit(mut self, limit: u64) -> Self {
1152        self.request_limit = limit;
1153        self
1154    }
1155
1156    /// Set the stream batch size
1157    ///
1158    /// This determines the number of headers the [`ReverseHeadersDownloader`] will yield on
1159    /// `Stream::next`. This will be the amount of headers the headers stage will commit at a
1160    /// time.
1161    pub const fn stream_batch_size(mut self, size: usize) -> Self {
1162        self.stream_batch_size = size;
1163        self
1164    }
1165
1166    /// Set the min amount of concurrent requests.
1167    ///
1168    /// If there's capacity the [`ReverseHeadersDownloader`] will keep at least this many requests
1169    /// active at a time.
1170    pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1171        self.min_concurrent_requests = min_concurrent_requests;
1172        self
1173    }
1174
1175    /// Set the max amount of concurrent requests.
1176    ///
1177    /// The downloader's concurrent requests won't exceed the given amount.
1178    pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1179        self.max_concurrent_requests = max_concurrent_requests;
1180        self
1181    }
1182
1183    /// How many responses to buffer internally.
1184    ///
1185    /// This essentially determines how much memory the downloader can use for buffering responses
1186    /// that arrive out of order. The total number of buffered headers is `request_limit *
1187    /// max_buffered_responses`. If the [`ReverseHeadersDownloader`]'s buffered responses exceeds
1188    /// this threshold it waits until there's capacity again before sending new requests.
1189    pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1190        self.max_buffered_responses = max_buffered_responses;
1191        self
1192    }
1193
1194    /// Build [`ReverseHeadersDownloader`] with provided consensus
1195    /// and header client implementations
1196    pub fn build<H>(
1197        self,
1198        client: H,
1199        consensus: Arc<dyn HeaderValidator<H::Header>>,
1200    ) -> ReverseHeadersDownloader<H>
1201    where
1202        H: HeadersClient + 'static,
1203    {
1204        let Self {
1205            request_limit,
1206            stream_batch_size,
1207            min_concurrent_requests,
1208            max_concurrent_requests,
1209            max_buffered_responses,
1210        } = self;
1211        ReverseHeadersDownloader {
1212            consensus,
1213            client: Arc::new(client),
1214            local_head: None,
1215            sync_target: None,
1216            // Note: we set these to `0` first, they'll be updated once the sync target response is
1217            // handled and only used afterwards
1218            next_request_block_number: 0,
1219            next_chain_tip_block_number: 0,
1220            lowest_validated_header: None,
1221            request_limit,
1222            min_concurrent_requests,
1223            max_concurrent_requests,
1224            stream_batch_size,
1225            max_buffered_responses,
1226            sync_target_request: None,
1227            in_progress_queue: Default::default(),
1228            buffered_responses: Default::default(),
1229            queued_validated_headers: Default::default(),
1230            metrics: Default::default(),
1231        }
1232    }
1233}
1234
1235/// Configures and returns the next [`HeadersRequest`] based on the given parameters
1236///
1237/// The request will start at the given `next_request_block_number` block.
1238/// The `limit` of the request will either be the targeted `request_limit` or the difference of
1239/// `next_request_block_number` and the `local_head` in case this is smaller than the targeted
1240/// `request_limit`.
1241#[inline]
1242fn calc_next_request(
1243    local_head: u64,
1244    next_request_block_number: u64,
1245    request_limit: u64,
1246) -> HeadersRequest {
1247    // downloading is in reverse
1248    let diff = next_request_block_number - local_head;
1249    let limit = diff.min(request_limit);
1250    let start = next_request_block_number;
1251    HeadersRequest::falling(start.into(), limit)
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256    use super::*;
1257    use crate::headers::test_utils::child_header;
1258    use alloy_consensus::Header;
1259    use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1260    use assert_matches::assert_matches;
1261    use reth_consensus::test_utils::TestConsensus;
1262    use reth_network_p2p::test_utils::TestHeadersClient;
1263
1264    /// Tests that `replace_number` works the same way as `Option::replace`
1265    #[test]
1266    fn test_replace_number_semantics() {
1267        struct Fixture {
1268            // input fields (both SyncTargetBlock and Option<u64>)
1269            sync_target_block: SyncTargetBlock,
1270            sync_target_option: Option<u64>,
1271
1272            // option to replace
1273            replace_number: u64,
1274
1275            // expected method result
1276            expected_result: Option<u64>,
1277
1278            // output state
1279            new_number: u64,
1280        }
1281
1282        let fixtures = vec![
1283            Fixture {
1284                sync_target_block: SyncTargetBlock::Hash(B256::random()),
1285                // Hash maps to None here, all other variants map to Some
1286                sync_target_option: None,
1287                replace_number: 1,
1288                expected_result: None,
1289                new_number: 1,
1290            },
1291            Fixture {
1292                sync_target_block: SyncTargetBlock::Number(1),
1293                sync_target_option: Some(1),
1294                replace_number: 2,
1295                expected_result: Some(1),
1296                new_number: 2,
1297            },
1298            Fixture {
1299                sync_target_block: SyncTargetBlock::HashAndNumber {
1300                    hash: B256::random(),
1301                    number: 1,
1302                },
1303                sync_target_option: Some(1),
1304                replace_number: 2,
1305                expected_result: Some(1),
1306                new_number: 2,
1307            },
1308        ];
1309
1310        for fixture in fixtures {
1311            let mut sync_target_block = fixture.sync_target_block;
1312            let result = sync_target_block.replace_number(fixture.replace_number);
1313            assert_eq!(result, fixture.expected_result);
1314            assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1315
1316            let mut sync_target_option = fixture.sync_target_option;
1317            let option_result = sync_target_option.replace(fixture.replace_number);
1318            assert_eq!(option_result, fixture.expected_result);
1319            assert_eq!(sync_target_option, Some(fixture.new_number));
1320        }
1321    }
1322
1323    /// Tests that request calc works
1324    #[test]
1325    fn test_sync_target_update() {
1326        let client = Arc::new(TestHeadersClient::default());
1327
1328        let genesis = SealedHeader::default();
1329
1330        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1331            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1332        downloader.update_local_head(genesis);
1333        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1334
1335        downloader.sync_target_request.take();
1336
1337        let target = SyncTarget::Tip(B256::random());
1338        downloader.update_sync_target(target);
1339        assert!(downloader.sync_target_request.is_some());
1340
1341        downloader.sync_target_request.take();
1342        let target = SyncTarget::Gap(BlockWithParent {
1343            block: BlockNumHash::new(0, B256::random()),
1344            parent: Default::default(),
1345        });
1346        downloader.update_sync_target(target);
1347        assert!(downloader.sync_target_request.is_none());
1348        assert_matches!(
1349            downloader.sync_target,
1350            Some(target) => target.number().is_some()
1351        );
1352    }
1353
1354    /// Tests that request calc works
1355    #[test]
1356    fn test_head_update() {
1357        let client = Arc::new(TestHeadersClient::default());
1358
1359        let header: SealedHeader = SealedHeader::default();
1360
1361        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1362            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1363        downloader.update_local_head(header.clone());
1364        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1365
1366        downloader.queued_validated_headers.push(header.clone());
1367        let mut next = header.as_ref().clone();
1368        next.number += 1;
1369        downloader.update_local_head(SealedHeader::new(next, B256::random()));
1370        assert!(downloader.queued_validated_headers.is_empty());
1371    }
1372
1373    #[test]
1374    fn test_request_calc() {
1375        // request an entire batch
1376        let local = 0;
1377        let next = 1000;
1378        let batch_size = 2;
1379        let request = calc_next_request(local, next, batch_size);
1380        assert_eq!(request.start, next.into());
1381        assert_eq!(request.limit, batch_size);
1382
1383        // only request 1
1384        let local = 999;
1385        let next = 1000;
1386        let batch_size = 2;
1387        let request = calc_next_request(local, next, batch_size);
1388        assert_eq!(request.start, next.into());
1389        assert_eq!(request.limit, 1);
1390    }
1391
1392    /// Tests that request calc works
1393    #[test]
1394    fn test_next_request() {
1395        let client = Arc::new(TestHeadersClient::default());
1396
1397        let genesis = SealedHeader::default();
1398
1399        let batch_size = 99;
1400        let start = 1000;
1401        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1402            .request_limit(batch_size)
1403            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1404        downloader.update_local_head(genesis);
1405        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1406
1407        downloader.next_request_block_number = start;
1408
1409        let mut total = 0;
1410        while let Some(req) = downloader.next_request() {
1411            assert_eq!(req.start, (start - total).into());
1412            total += req.limit;
1413        }
1414        assert_eq!(total, start);
1415        assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1416    }
1417
1418    #[test]
1419    fn test_resp_order() {
1420        let mut heap = BinaryHeap::new();
1421        let hi = 1u64;
1422        heap.push(OrderedHeadersResponse::<Header> {
1423            headers: vec![],
1424            request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1425            peer_id: Default::default(),
1426        });
1427
1428        let lo = 0u64;
1429        heap.push(OrderedHeadersResponse {
1430            headers: vec![],
1431            request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1432            peer_id: Default::default(),
1433        });
1434
1435        assert_eq!(heap.pop().unwrap().block_number(), hi);
1436        assert_eq!(heap.pop().unwrap().block_number(), lo);
1437    }
1438
1439    #[tokio::test]
1440    async fn download_at_fork_head() {
1441        reth_tracing::init_test_tracing();
1442
1443        let client = Arc::new(TestHeadersClient::default());
1444
1445        let p3 = SealedHeader::default();
1446        let p2 = child_header(&p3);
1447        let p1 = child_header(&p2);
1448        let p0 = child_header(&p1);
1449
1450        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1451            .stream_batch_size(3)
1452            .request_limit(3)
1453            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1454        downloader.update_local_head(p3.clone());
1455        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1456
1457        client
1458            .extend(vec![
1459                p0.as_ref().clone(),
1460                p1.as_ref().clone(),
1461                p2.as_ref().clone(),
1462                p3.as_ref().clone(),
1463            ])
1464            .await;
1465
1466        let headers = downloader.next().await.unwrap();
1467        assert_eq!(headers, Ok(vec![p0, p1, p2,]));
1468        assert!(downloader.buffered_responses.is_empty());
1469        assert!(downloader.next().await.is_none());
1470        assert!(downloader.next().await.is_none());
1471    }
1472
1473    #[tokio::test]
1474    async fn download_one_by_one() {
1475        reth_tracing::init_test_tracing();
1476        let p3 = SealedHeader::default();
1477        let p2 = child_header(&p3);
1478        let p1 = child_header(&p2);
1479        let p0 = child_header(&p1);
1480
1481        let client = Arc::new(TestHeadersClient::default());
1482        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1483            .stream_batch_size(1)
1484            .request_limit(1)
1485            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1486        downloader.update_local_head(p3.clone());
1487        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1488
1489        client
1490            .extend(vec![
1491                p0.as_ref().clone(),
1492                p1.as_ref().clone(),
1493                p2.as_ref().clone(),
1494                p3.as_ref().clone(),
1495            ])
1496            .await;
1497
1498        let headers = downloader.next().await.unwrap();
1499        assert_eq!(headers, Ok(vec![p0]));
1500        let headers = headers.unwrap();
1501        assert_eq!(headers.capacity(), headers.len());
1502
1503        let headers = downloader.next().await.unwrap();
1504        assert_eq!(headers, Ok(vec![p1]));
1505        let headers = headers.unwrap();
1506        assert_eq!(headers.capacity(), headers.len());
1507
1508        let headers = downloader.next().await.unwrap();
1509        assert_eq!(headers, Ok(vec![p2]));
1510        let headers = headers.unwrap();
1511        assert_eq!(headers.capacity(), headers.len());
1512
1513        assert!(downloader.next().await.is_none());
1514    }
1515
1516    #[tokio::test]
1517    async fn download_one_by_one_larger_request_limit() {
1518        reth_tracing::init_test_tracing();
1519        let p3 = SealedHeader::default();
1520        let p2 = child_header(&p3);
1521        let p1 = child_header(&p2);
1522        let p0 = child_header(&p1);
1523
1524        let client = Arc::new(TestHeadersClient::default());
1525        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1526            .stream_batch_size(1)
1527            .request_limit(3)
1528            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1529        downloader.update_local_head(p3.clone());
1530        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1531
1532        client
1533            .extend(vec![
1534                p0.as_ref().clone(),
1535                p1.as_ref().clone(),
1536                p2.as_ref().clone(),
1537                p3.as_ref().clone(),
1538            ])
1539            .await;
1540
1541        let headers = downloader.next().await.unwrap();
1542        assert_eq!(headers, Ok(vec![p0]));
1543        let headers = headers.unwrap();
1544        assert_eq!(headers.capacity(), headers.len());
1545
1546        let headers = downloader.next().await.unwrap();
1547        assert_eq!(headers, Ok(vec![p1]));
1548        let headers = headers.unwrap();
1549        assert_eq!(headers.capacity(), headers.len());
1550
1551        let headers = downloader.next().await.unwrap();
1552        assert_eq!(headers, Ok(vec![p2]));
1553        let headers = headers.unwrap();
1554        assert_eq!(headers.capacity(), headers.len());
1555
1556        assert!(downloader.next().await.is_none());
1557    }
1558}