reth_downloaders/headers/
reverse_headers.rs

1//! A headers downloader that can handle multiple requests concurrently.
2
3use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, Sealable, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14    error::{DownloadError, DownloadResult, PeerRequestResult},
15    headers::{
16        client::{HeadersClient, HeadersRequest},
17        downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18        error::{HeadersDownloaderError, HeadersDownloaderResult},
19    },
20    priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives_traits::{GotExpected, SealedHeader};
24use reth_tasks::{TaskSpawner, TokioTaskExecutor};
25use std::{
26    cmp::{Ordering, Reverse},
27    collections::{binary_heap::PeekMut, BinaryHeap},
28    future::Future,
29    pin::Pin,
30    sync::Arc,
31    task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{debug, error, trace};
35
36/// A heuristic that is used to determine the number of requests that should be prepared for a peer.
37/// This should ensure that there are always requests lined up for peers to handle while the
38/// downloader is yielding a next batch of headers that is being committed to the database.
39const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41/// Wrapper for internal downloader errors.
42#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H: Sealable> {
44    #[error(transparent)]
45    Downloader(#[from] HeadersDownloaderError<H>),
46    #[error(transparent)]
47    Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H: Sealable> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51    fn from(value: HeadersResponseError) -> Self {
52        Self::Response(Box::new(value))
53    }
54}
55
56/// Downloads headers concurrently.
57///
58/// This [`HeaderDownloader`] downloads headers using the configured [`HeadersClient`].
59/// Headers can be requested by hash or block number and take a `limit` parameter. This downloader
60/// tries to fill the gap between the local head of the node and the chain tip by issuing multiple
61/// requests at a time but yielding them in batches on [`Stream::poll_next`].
62///
63/// **Note:** This downloader downloads in reverse, see also
64/// [`reth_network_p2p::headers::client::HeadersDirection`], this means the batches of headers that
65/// this downloader yields will start at the chain tip and move towards the local head: falling
66/// block numbers.
67#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70    /// Consensus client used to validate headers
71    consensus: Arc<dyn HeaderValidator<H::Header>>,
72    /// Client used to download headers.
73    client: Arc<H>,
74    /// The local head of the chain.
75    local_head: Option<SealedHeader<H::Header>>,
76    /// Block we want to close the gap to.
77    sync_target: Option<SyncTargetBlock>,
78    /// The block number to use for requests.
79    next_request_block_number: u64,
80    /// Keeps track of the block we need to validate next.
81    lowest_validated_header: Option<SealedHeader<H::Header>>,
82    /// Tip block number to start validating from (in reverse)
83    next_chain_tip_block_number: u64,
84    /// The batch size per one request
85    request_limit: u64,
86    /// Minimum amount of requests to handle concurrently.
87    min_concurrent_requests: usize,
88    /// Maximum amount of requests to handle concurrently.
89    max_concurrent_requests: usize,
90    /// The number of block headers to return at once
91    stream_batch_size: usize,
92    /// Maximum amount of received headers to buffer internally.
93    max_buffered_responses: usize,
94    /// Contains the request to retrieve the headers for the sync target
95    ///
96    /// This will give us the block number of the `sync_target`, after which we can send multiple
97    /// requests at a time.
98    sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99    /// requests in progress
100    in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101    /// Buffered, unvalidated responses
102    buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103    /// Buffered, _sorted_ and validated headers ready to be returned.
104    ///
105    /// Note: headers are sorted from high to low
106    queued_validated_headers: Vec<SealedHeader<H::Header>>,
107    /// Header downloader metrics.
108    metrics: HeaderDownloaderMetrics,
109}
110
111// === impl ReverseHeadersDownloader ===
112
113impl<H> ReverseHeadersDownloader<H>
114where
115    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117    /// Convenience method to create a [`ReverseHeadersDownloaderBuilder`] without importing it
118    pub fn builder() -> ReverseHeadersDownloaderBuilder {
119        ReverseHeadersDownloaderBuilder::default()
120    }
121
122    /// Returns the block number the local node is at.
123    #[inline]
124    fn local_block_number(&self) -> Option<BlockNumber> {
125        self.local_head.as_ref().map(|h| h.number())
126    }
127
128    /// Returns the existing local head block number
129    ///
130    /// # Panics
131    ///
132    /// If the local head has not been set.
133    #[inline]
134    fn existing_local_block_number(&self) -> BlockNumber {
135        self.local_head.as_ref().expect("is initialized").number()
136    }
137
138    /// Returns the existing sync target.
139    ///
140    /// # Panics
141    ///
142    /// If the sync target has never been set.
143    #[inline]
144    fn existing_sync_target(&self) -> SyncTargetBlock {
145        self.sync_target.as_ref().expect("is initialized").clone()
146    }
147
148    /// Max requests to handle at the same time
149    ///
150    /// This depends on the number of active peers but will always be
151    /// [`min_concurrent_requests`..`max_concurrent_requests`]
152    #[inline]
153    fn concurrent_request_limit(&self) -> usize {
154        let num_peers = self.client.num_connected_peers();
155
156        // we try to keep more requests than available peers active so that there's always a
157        // followup request available for a peer
158        let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159        let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161        // If only a few peers are connected we keep it low
162        if num_peers < self.min_concurrent_requests {
163            return max_dynamic
164        }
165
166        max_dynamic.min(self.max_concurrent_requests)
167    }
168
169    /// Returns the next header request
170    ///
171    /// This will advance the current block towards the local head.
172    ///
173    /// Returns `None` if no more requests are required.
174    fn next_request(&mut self) -> Option<HeadersRequest> {
175        if let Some(local_head) = self.local_block_number() {
176            if self.next_request_block_number > local_head {
177                let request = calc_next_request(
178                    local_head,
179                    self.next_request_block_number,
180                    self.request_limit,
181                );
182                // need to shift the tracked request block number based on the number of requested
183                // headers so follow-up requests will use that as start.
184                self.next_request_block_number -= request.limit;
185
186                return Some(request)
187            }
188        }
189
190        None
191    }
192
193    /// Returns the next header to use for validation.
194    ///
195    /// Since this downloader downloads blocks with falling block number, this will return the
196    /// lowest (in terms of block number) validated header.
197    ///
198    /// This is either the last `queued_validated_headers`, or if has been drained entirely the
199    /// `lowest_validated_header`.
200    ///
201    /// This only returns `None` if we haven't fetched the initial chain tip yet.
202    fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
203        self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
204    }
205
206    /// Resets the request trackers and clears the sync target.
207    ///
208    /// This ensures the downloader will restart after a new sync target has been set.
209    fn reset(&mut self) {
210        debug!(target: "downloaders::headers", "Resetting headers downloader");
211        self.next_request_block_number = 0;
212        self.next_chain_tip_block_number = 0;
213        self.sync_target.take();
214    }
215
216    /// Validate that the received header matches the expected sync target.
217    fn validate_sync_target(
218        &self,
219        header: &SealedHeader<H::Header>,
220        request: HeadersRequest,
221        peer_id: PeerId,
222    ) -> Result<(), Box<HeadersResponseError>> {
223        match self.existing_sync_target() {
224            SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
225                if header.hash() != hash =>
226            {
227                Err(Box::new(HeadersResponseError {
228                    request,
229                    peer_id: Some(peer_id),
230                    error: DownloadError::InvalidTip(
231                        GotExpected { got: header.hash(), expected: hash }.into(),
232                    ),
233                }))
234            }
235            SyncTargetBlock::Number(number) if header.number() != number => {
236                Err(Box::new(HeadersResponseError {
237                    request,
238                    peer_id: Some(peer_id),
239                    error: DownloadError::InvalidTipNumber(GotExpected {
240                        got: header.number(),
241                        expected: number,
242                    }),
243                }))
244            }
245            _ => Ok(()),
246        }
247    }
248
249    /// Processes the next headers in line.
250    ///
251    /// This will validate all headers and insert them into the validated buffer.
252    ///
253    /// Returns an error if the given headers are invalid.
254    ///
255    /// Caution: this expects the `headers` to be sorted with _falling_ block numbers
256    fn process_next_headers(
257        &mut self,
258        request: HeadersRequest,
259        headers: Vec<H::Header>,
260        peer_id: PeerId,
261    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
262        let mut validated = Vec::with_capacity(headers.len());
263
264        let sealed_headers =
265            headers.into_par_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>();
266        for parent in sealed_headers {
267            // Validate that the header is the parent header of the last validated header.
268            if let Some(validated_header) =
269                validated.last().or_else(|| self.lowest_validated_header())
270            {
271                if let Err(error) = self.validate(validated_header, &parent) {
272                    trace!(target: "downloaders::headers", %error ,"Failed to validate header");
273                    return Err(
274                        HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
275                    )
276                }
277            } else {
278                self.validate_sync_target(&parent, request.clone(), peer_id)?;
279            }
280
281            validated.push(parent);
282        }
283
284        // If the last (smallest) validated header attaches to the local head, validate it.
285        if let Some((last_header, head)) = validated
286            .last_mut()
287            .zip(self.local_head.as_ref())
288            .filter(|(last, head)| last.number() == head.number() + 1)
289        {
290            // Every header must be valid on its own
291            if let Err(error) = self.consensus.validate_header(&*last_header) {
292                trace!(target: "downloaders::headers", %error, "Failed to validate header");
293                return Err(HeadersResponseError {
294                    request,
295                    peer_id: Some(peer_id),
296                    error: DownloadError::HeaderValidation {
297                        hash: head.hash(),
298                        number: head.number(),
299                        error: Box::new(error),
300                    },
301                }
302                .into())
303            }
304
305            // If the header is valid on its own, but not against its parent, we return it as
306            // detached head error.
307            // In stage sync this will trigger an unwind because this means that the local head
308            // is not part of the chain the sync target is on. In other words, the downloader was
309            // unable to connect the sync target with the local head because the sync target and
310            // the local head or on different chains.
311            if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
312                let local_head = head.clone();
313                // Replace the last header with a detached variant
314                error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
315
316                // Reset trackers so that we can start over the next time the sync target is
317                // updated.
318                // The expected event flow when that happens is that the node will unwind the local
319                // chain and restart the downloader.
320                self.reset();
321
322                return Err(HeadersDownloaderError::DetachedHead {
323                    local_head: Box::new(local_head),
324                    header: Box::new(last_header.clone()),
325                    error: Box::new(error),
326                }
327                .into())
328            }
329        }
330
331        // update tracked block info (falling block number)
332        self.next_chain_tip_block_number =
333            validated.last().expect("exists").number().saturating_sub(1);
334        self.queued_validated_headers.extend(validated);
335
336        Ok(())
337    }
338
339    /// Updates the state based on the given `target_block_number`
340    ///
341    /// There are three different outcomes:
342    ///  * This is the first time this is called: current `sync_target` block is still `None`. In
343    ///    which case we're initializing the request trackers to `next_block`
344    ///  * The `target_block_number` is _higher_ than the current target. In which case we start
345    ///    over with a new range
346    ///  * The `target_block_number` is _lower_ than the current target or the _same_. In which case
347    ///    we don't need to update the request trackers but need to ensure already buffered headers
348    ///    are _not_ higher than the new `target_block_number`.
349    fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
350        // Update the trackers
351        if let Some(old_target) =
352            self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
353        {
354            if target_block_number > old_target {
355                // the new target is higher than the old target we need to update the
356                // request tracker and reset everything
357                self.next_request_block_number = next_block;
358                self.next_chain_tip_block_number = next_block;
359                self.clear();
360            } else {
361                // ensure already validated headers are in range
362                let skip = self
363                    .queued_validated_headers
364                    .iter()
365                    .take_while(|last| last.number() > target_block_number)
366                    .count();
367                // removes all headers that are higher than current target
368                self.queued_validated_headers.drain(..skip);
369            }
370        } else {
371            // this occurs on the initial sync target request
372            self.next_request_block_number = next_block;
373            self.next_chain_tip_block_number = next_block;
374        }
375    }
376
377    /// Handles the response for the request for the sync target
378    fn on_sync_target_outcome(
379        &mut self,
380        response: HeadersRequestOutcome<H::Header>,
381    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
382        let sync_target = self.existing_sync_target();
383        let HeadersRequestOutcome { request, outcome } = response;
384        match outcome {
385            Ok(res) => {
386                let (peer_id, mut headers) = res.split();
387
388                // update total downloaded metric
389                self.metrics.total_downloaded.increment(headers.len() as u64);
390
391                // sort headers from highest to lowest block number
392                headers.sort_unstable_by_key(|h| Reverse(h.number()));
393
394                if headers.is_empty() {
395                    return Err(HeadersResponseError {
396                        request,
397                        peer_id: Some(peer_id),
398                        error: DownloadError::EmptyResponse,
399                    }
400                    .into())
401                }
402
403                let header = headers.swap_remove(0);
404                let target = SealedHeader::seal_slow(header);
405
406                match sync_target {
407                    SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
408                        if target.hash() != hash {
409                            return Err(HeadersResponseError {
410                                request,
411                                peer_id: Some(peer_id),
412                                error: DownloadError::InvalidTip(
413                                    GotExpected { got: target.hash(), expected: hash }.into(),
414                                ),
415                            }
416                            .into())
417                        }
418                    }
419                    SyncTargetBlock::Number(number) => {
420                        if target.number() != number {
421                            return Err(HeadersResponseError {
422                                request,
423                                peer_id: Some(peer_id),
424                                error: DownloadError::InvalidTipNumber(GotExpected {
425                                    got: target.number(),
426                                    expected: number,
427                                }),
428                            }
429                            .into())
430                        }
431                    }
432                }
433
434                trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
435
436                // This is the next block we need to start issuing requests from
437                let parent_block_number = target.number().saturating_sub(1);
438                self.on_block_number_update(target.number(), parent_block_number);
439
440                self.queued_validated_headers.push(target);
441
442                // try to validate all buffered responses blocked by this successful response
443                self.try_validate_buffered()
444                    .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
445                    .transpose()?;
446
447                Ok(())
448            }
449            Err(err) => {
450                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
451            }
452        }
453    }
454
455    /// Invoked when we received a response
456    fn on_headers_outcome(
457        &mut self,
458        response: HeadersRequestOutcome<H::Header>,
459    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
460        let requested_block_number = response.block_number();
461        let HeadersRequestOutcome { request, outcome } = response;
462
463        match outcome {
464            Ok(res) => {
465                let (peer_id, mut headers) = res.split();
466
467                // update total downloaded metric
468                self.metrics.total_downloaded.increment(headers.len() as u64);
469
470                trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
471
472                if headers.is_empty() {
473                    return Err(HeadersResponseError {
474                        request,
475                        peer_id: Some(peer_id),
476                        error: DownloadError::EmptyResponse,
477                    }
478                    .into())
479                }
480
481                if (headers.len() as u64) != request.limit {
482                    return Err(HeadersResponseError {
483                        peer_id: Some(peer_id),
484                        error: DownloadError::HeadersResponseTooShort(GotExpected {
485                            got: headers.len() as u64,
486                            expected: request.limit,
487                        }),
488                        request,
489                    }
490                    .into())
491                }
492
493                // sort headers from highest to lowest block number
494                headers.sort_unstable_by_key(|h| Reverse(h.number()));
495
496                // validate the response
497                let highest = &headers[0];
498
499                trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
500
501                if highest.number() != requested_block_number {
502                    return Err(HeadersResponseError {
503                        request,
504                        peer_id: Some(peer_id),
505                        error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
506                            got: highest.number(),
507                            expected: requested_block_number,
508                        }),
509                    }
510                    .into())
511                }
512
513                // check if the response is the next expected
514                if highest.number() == self.next_chain_tip_block_number {
515                    // is next response, validate it
516                    self.process_next_headers(request, headers, peer_id)?;
517                    // try to validate all buffered responses blocked by this successful response
518                    self.try_validate_buffered()
519                        .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
520                        .transpose()?;
521                } else if highest.number() > self.existing_local_block_number() {
522                    self.metrics.buffered_responses.increment(1.);
523                    // can't validate yet
524                    self.buffered_responses.push(OrderedHeadersResponse {
525                        headers,
526                        request,
527                        peer_id,
528                    })
529                }
530
531                Ok(())
532            }
533            // most likely a noop, because this error
534            // would've been handled by the fetcher internally
535            Err(err) => {
536                trace!(target: "downloaders::headers", %err, "Response error");
537                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
538            }
539        }
540    }
541
542    fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
543        // Penalize the peer for bad response
544        if let Some(peer_id) = peer_id {
545            trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
546            self.client.report_bad_message(peer_id);
547        }
548    }
549
550    /// Handles the error of a bad response
551    ///
552    /// This will re-submit the request.
553    fn on_headers_error(&self, err: Box<HeadersResponseError>) {
554        let HeadersResponseError { request, peer_id, error } = *err;
555
556        self.penalize_peer(peer_id, &error);
557
558        // Update error metric
559        self.metrics.increment_errors(&error);
560
561        // Re-submit the request
562        self.submit_request(request, Priority::High);
563    }
564
565    /// Attempts to validate the buffered responses
566    ///
567    /// Returns an error if the next expected response was popped, but failed validation.
568    fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
569        loop {
570            // Check to see if we've already received the next value
571            let next_response = self.buffered_responses.peek_mut()?;
572            let next_block_number = next_response.block_number();
573            match next_block_number.cmp(&self.next_chain_tip_block_number) {
574                Ordering::Less => return None,
575                Ordering::Equal => {
576                    let OrderedHeadersResponse { headers, request, peer_id } =
577                        PeekMut::pop(next_response);
578                    self.metrics.buffered_responses.decrement(1.);
579
580                    if let Err(err) = self.process_next_headers(request, headers, peer_id) {
581                        return Some(err)
582                    }
583                }
584                Ordering::Greater => {
585                    self.metrics.buffered_responses.decrement(1.);
586                    PeekMut::pop(next_response);
587                }
588            }
589        }
590    }
591
592    /// Returns the request for the `sync_target` header.
593    const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
594        HeadersRequest::falling(start, 1)
595    }
596
597    /// Starts a request future
598    fn submit_request(&self, request: HeadersRequest, priority: Priority) {
599        trace!(target: "downloaders::headers", ?request, "Submitting headers request");
600        self.in_progress_queue.push(self.request_fut(request, priority));
601        self.metrics.in_flight_requests.increment(1.);
602    }
603
604    fn request_fut(
605        &self,
606        request: HeadersRequest,
607        priority: Priority,
608    ) -> HeadersRequestFuture<H::Output> {
609        let client = Arc::clone(&self.client);
610        HeadersRequestFuture {
611            request: Some(request.clone()),
612            fut: client.get_headers_with_priority(request, priority),
613        }
614    }
615
616    /// Validate whether the header is valid in relation to it's parent
617    fn validate(
618        &self,
619        header: &SealedHeader<H::Header>,
620        parent: &SealedHeader<H::Header>,
621    ) -> DownloadResult<()> {
622        validate_header_download(&self.consensus, header, parent)
623    }
624
625    /// Clears all requests/responses.
626    fn clear(&mut self) {
627        self.lowest_validated_header.take();
628        self.queued_validated_headers = Vec::new();
629        self.buffered_responses = BinaryHeap::new();
630        self.in_progress_queue.clear();
631
632        self.metrics.in_flight_requests.set(0.);
633        self.metrics.buffered_responses.set(0.);
634    }
635
636    /// Splits off the next batch of headers
637    fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
638        let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
639        let mut rem = self.queued_validated_headers.split_off(batch_size);
640        std::mem::swap(&mut rem, &mut self.queued_validated_headers);
641        // If the downloader consumer does not flush headers at the same rate that the downloader
642        // queues them, then the `queued_validated_headers` buffer can grow unbounded.
643        //
644        // The semantics of `split_off` state that the capacity of the original buffer is
645        // unchanged, so queued_validated_headers will then have only `batch_size` elements, and
646        // its original capacity. Because `rem` is initially populated with elements `[batch_size,
647        // len)` of `queued_validated_headers`, it will have a capacity of at least `len -
648        // batch_size`, and the total memory allocated by the two buffers will be around double the
649        // original size of `queued_validated_headers`.
650        //
651        // These are then mem::swapped, leaving `rem` with a large capacity, but small length.
652        //
653        // To prevent these allocations from leaking to the consumer, we shrink the capacity of the
654        // new buffer. The total memory allocated should then be not much more than the original
655        // size of `queued_validated_headers`.
656        rem.shrink_to_fit();
657        rem
658    }
659}
660
661impl<H> ReverseHeadersDownloader<H>
662where
663    H: HeadersClient,
664    Self: HeaderDownloader + 'static,
665{
666    /// Spawns the downloader task via [`tokio::task::spawn`]
667    pub fn into_task(self) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
668        self.into_task_with(&TokioTaskExecutor::default())
669    }
670
671    /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given `spawner`.
672    pub fn into_task_with<S>(
673        self,
674        spawner: &S,
675    ) -> TaskDownloader<<Self as HeaderDownloader>::Header>
676    where
677        S: TaskSpawner,
678    {
679        TaskDownloader::spawn_with(self, spawner)
680    }
681}
682
683impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
684where
685    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
686{
687    type Header = H::Header;
688
689    fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
690        // ensure we're only yielding headers that are in range and follow the current local head.
691        while self
692            .queued_validated_headers
693            .last()
694            .is_some_and(|last| last.number() <= head.number())
695        {
696            // headers are sorted high to low
697            self.queued_validated_headers.pop();
698        }
699        trace!(
700            target: "downloaders::headers",
701            head=?head.num_hash(),
702            "Updating local head"
703        );
704        // update the local head
705        self.local_head = Some(head);
706    }
707
708    /// If the given target is different from the current target, we need to update the sync target
709    fn update_sync_target(&mut self, target: SyncTarget) {
710        let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
711        trace!(
712            target: "downloaders::headers",
713            sync_target=?target,
714            current_tip=?current_tip,
715            "Updating sync target"
716        );
717        match target {
718            SyncTarget::Tip(tip) => {
719                if Some(tip) != current_tip {
720                    trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
721                    let new_sync_target = SyncTargetBlock::from_hash(tip);
722
723                    // if the new sync target is the next queued request we don't need to re-start
724                    // the target update
725                    if let Some(target_number) = self
726                        .queued_validated_headers
727                        .first()
728                        .filter(|h| h.hash() == tip)
729                        .map(|h| h.number())
730                    {
731                        self.sync_target = Some(new_sync_target.with_number(target_number));
732                        return
733                    }
734
735                    trace!(target: "downloaders::headers", new=?target, "Request new sync target");
736                    self.metrics.out_of_order_requests.increment(1);
737                    self.sync_target = Some(new_sync_target);
738                    self.sync_target_request = Some(
739                        self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
740                    );
741                }
742            }
743            SyncTarget::Gap(existing) => {
744                let target = existing.parent;
745                if Some(target) != current_tip {
746                    // there could be a sync target request in progress
747                    self.sync_target_request.take();
748                    // If the target has changed, update the request pointers based on the new
749                    // targeted block number
750                    let parent_block_number = existing.block.number.saturating_sub(1);
751
752                    trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
753
754                    // Update the sync target hash
755                    self.sync_target = match self.sync_target.take() {
756                        Some(sync_target) => Some(sync_target.with_hash(target)),
757                        None => Some(SyncTargetBlock::from_hash(target)),
758                    };
759                    self.on_block_number_update(parent_block_number, parent_block_number);
760                }
761            }
762            SyncTarget::TipNum(num) => {
763                let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
764                if Some(num) != current_tip_num {
765                    trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
766                    // just update the sync target
767                    self.sync_target = Some(SyncTargetBlock::from_number(num));
768                    self.sync_target_request = Some(
769                        self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
770                    );
771                }
772            }
773        }
774    }
775
776    fn set_batch_size(&mut self, batch_size: usize) {
777        self.stream_batch_size = batch_size;
778    }
779}
780
781impl<H> Stream for ReverseHeadersDownloader<H>
782where
783    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
784{
785    type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
786
787    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
788        let this = self.get_mut();
789
790        // The downloader boundaries (local head and sync target) have to be set in order
791        // to start downloading data.
792        if this.local_head.is_none() || this.sync_target.is_none() {
793            trace!(
794                target: "downloaders::headers",
795                head=?this.local_block_number(),
796                sync_target=?this.sync_target,
797                "The downloader sync boundaries have not been set"
798            );
799            return Poll::Pending
800        }
801
802        // If we have a new tip request we need to complete that first before we send batched
803        // requests
804        while let Some(mut req) = this.sync_target_request.take() {
805            match req.poll_unpin(cx) {
806                Poll::Ready(outcome) => {
807                    match this.on_sync_target_outcome(outcome) {
808                        Ok(()) => break,
809                        Err(ReverseHeadersDownloaderError::Response(error)) => {
810                            trace!(target: "downloaders::headers", %error, "invalid sync target response");
811                            if error.is_channel_closed() {
812                                // download channel closed which means the network was dropped
813                                return Poll::Ready(None)
814                            }
815
816                            this.penalize_peer(error.peer_id, &error.error);
817                            this.metrics.increment_errors(&error.error);
818                            this.sync_target_request =
819                                Some(this.request_fut(error.request, Priority::High));
820                        }
821                        Err(ReverseHeadersDownloaderError::Downloader(error)) => {
822                            this.clear();
823                            return Poll::Ready(Some(Err(error)))
824                        }
825                    };
826                }
827                Poll::Pending => {
828                    this.sync_target_request = Some(req);
829                    return Poll::Pending
830                }
831            }
832        }
833
834        // shrink the buffer after handling sync target outcomes
835        this.buffered_responses.shrink_to_fit();
836
837        // this loop will submit new requests and poll them, if a new batch is ready it is returned
838        // The actual work is done by the receiver of the request channel, this means, polling the
839        // request future is just reading from a `oneshot::Receiver`. Hence, this loop tries to keep
840        // the downloader at capacity at all times The order of loops is as follows:
841        // 1. poll futures to make room for followup requests (this will also prepare validated
842        // headers for 3.) 2. exhaust all capacity by sending requests
843        // 3. return batch, if enough validated
844        // 4. return Pending if 2.) did not submit a new request, else continue
845        loop {
846            // poll requests
847            while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
848                this.metrics.in_flight_requests.decrement(1.);
849                // handle response
850                match this.on_headers_outcome(outcome) {
851                    Ok(()) => (),
852                    Err(ReverseHeadersDownloaderError::Response(error)) => {
853                        if error.is_channel_closed() {
854                            // download channel closed which means the network was dropped
855                            return Poll::Ready(None)
856                        }
857                        this.on_headers_error(error);
858                    }
859                    Err(ReverseHeadersDownloaderError::Downloader(error)) => {
860                        this.clear();
861                        return Poll::Ready(Some(Err(error)))
862                    }
863                };
864            }
865
866            // shrink the buffer after handling headers outcomes
867            this.buffered_responses.shrink_to_fit();
868
869            // marks the loop's exit condition: exit if no requests submitted
870            let mut progress = false;
871
872            let concurrent_request_limit = this.concurrent_request_limit();
873            // populate requests
874            while this.in_progress_queue.len() < concurrent_request_limit &&
875                this.buffered_responses.len() < this.max_buffered_responses
876            {
877                if let Some(request) = this.next_request() {
878                    trace!(
879                        target: "downloaders::headers",
880                        "Requesting headers {request:?}"
881                    );
882                    progress = true;
883                    this.submit_request(request, Priority::Normal);
884                } else {
885                    // no more requests
886                    break
887                }
888            }
889
890            // yield next batch
891            if this.queued_validated_headers.len() >= this.stream_batch_size {
892                let next_batch = this.split_next_batch();
893
894                // Note: if this would drain all headers, we need to keep the lowest (last index)
895                // around so we can continue validating headers responses.
896                if this.queued_validated_headers.is_empty() {
897                    this.lowest_validated_header = next_batch.last().cloned();
898                }
899
900                trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
901
902                this.metrics.total_flushed.increment(next_batch.len() as u64);
903                return Poll::Ready(Some(Ok(next_batch)))
904            }
905
906            if !progress {
907                break
908            }
909        }
910
911        // all requests are handled, stream is finished
912        if this.in_progress_queue.is_empty() {
913            let next_batch = this.split_next_batch();
914            if next_batch.is_empty() {
915                this.clear();
916                return Poll::Ready(None)
917            }
918            this.metrics.total_flushed.increment(next_batch.len() as u64);
919            return Poll::Ready(Some(Ok(next_batch)))
920        }
921
922        Poll::Pending
923    }
924}
925
926/// A future that returns a list of headers on success.
927#[derive(Debug)]
928struct HeadersRequestFuture<F> {
929    request: Option<HeadersRequest>,
930    fut: F,
931}
932
933impl<F, H> Future for HeadersRequestFuture<F>
934where
935    F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
936{
937    type Output = HeadersRequestOutcome<H>;
938
939    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
940        let this = self.get_mut();
941        let outcome = ready!(this.fut.poll_unpin(cx));
942        let request = this.request.take().unwrap();
943
944        Poll::Ready(HeadersRequestOutcome { request, outcome })
945    }
946}
947
948/// The outcome of the [`HeadersRequestFuture`]
949struct HeadersRequestOutcome<H> {
950    request: HeadersRequest,
951    outcome: PeerRequestResult<Vec<H>>,
952}
953
954// === impl OrderedHeadersResponse ===
955
956impl<H> HeadersRequestOutcome<H> {
957    const fn block_number(&self) -> u64 {
958        self.request.start.as_number().expect("is number")
959    }
960}
961
962/// Wrapper type to order responses
963#[derive(Debug)]
964struct OrderedHeadersResponse<H> {
965    headers: Vec<H>,
966    request: HeadersRequest,
967    peer_id: PeerId,
968}
969
970// === impl OrderedHeadersResponse ===
971
972impl<H> OrderedHeadersResponse<H> {
973    const fn block_number(&self) -> u64 {
974        self.request.start.as_number().expect("is number")
975    }
976}
977
978impl<H> PartialEq for OrderedHeadersResponse<H> {
979    fn eq(&self, other: &Self) -> bool {
980        self.block_number() == other.block_number()
981    }
982}
983
984impl<H> Eq for OrderedHeadersResponse<H> {}
985
986impl<H> PartialOrd for OrderedHeadersResponse<H> {
987    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
988        Some(self.cmp(other))
989    }
990}
991
992impl<H> Ord for OrderedHeadersResponse<H> {
993    fn cmp(&self, other: &Self) -> Ordering {
994        self.block_number().cmp(&other.block_number())
995    }
996}
997
998/// Type returned if a bad response was processed
999#[derive(Debug, Error)]
1000#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
1001struct HeadersResponseError {
1002    request: HeadersRequest,
1003    peer_id: Option<PeerId>,
1004    #[source]
1005    error: DownloadError,
1006}
1007
1008impl HeadersResponseError {
1009    /// Returns true if the error was caused by a closed channel to the network.
1010    const fn is_channel_closed(&self) -> bool {
1011        if let DownloadError::RequestError(ref err) = self.error {
1012            return err.is_channel_closed()
1013        }
1014        false
1015    }
1016}
1017
1018/// The block to which we want to close the gap: (local head...sync target]
1019/// This tracks the sync target block, so this could be either a block number or hash.
1020#[derive(Clone, Debug)]
1021pub enum SyncTargetBlock {
1022    /// Block hash of the targeted block
1023    Hash(B256),
1024    /// Block number of the targeted block
1025    Number(u64),
1026    /// Both the block hash and number of the targeted block
1027    HashAndNumber {
1028        /// Block hash of the targeted block
1029        hash: B256,
1030        /// Block number of the targeted block
1031        number: u64,
1032    },
1033}
1034
1035impl SyncTargetBlock {
1036    /// Create new instance from hash.
1037    const fn from_hash(hash: B256) -> Self {
1038        Self::Hash(hash)
1039    }
1040
1041    /// Create new instance from number.
1042    const fn from_number(num: u64) -> Self {
1043        Self::Number(num)
1044    }
1045
1046    /// Set the hash for the sync target.
1047    const fn with_hash(self, hash: B256) -> Self {
1048        match self {
1049            Self::Hash(_) => Self::Hash(hash),
1050            Self::Number(number) | Self::HashAndNumber { number, .. } => {
1051                Self::HashAndNumber { hash, number }
1052            }
1053        }
1054    }
1055
1056    /// Set a number on the instance.
1057    const fn with_number(self, number: u64) -> Self {
1058        match self {
1059            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1060                Self::HashAndNumber { hash, number }
1061            }
1062            Self::Number(_) => Self::Number(number),
1063        }
1064    }
1065
1066    /// Replace the target block number, and return the old block number, if it was set.
1067    ///
1068    /// If the target block is a hash, this be converted into a `HashAndNumber`, but return `None`.
1069    /// The semantics should be equivalent to that of `Option::replace`.
1070    const fn replace_number(&mut self, number: u64) -> Option<u64> {
1071        match self {
1072            Self::Hash(hash) => {
1073                *self = Self::HashAndNumber { hash: *hash, number };
1074                None
1075            }
1076            Self::Number(old_number) => {
1077                let res = Some(*old_number);
1078                *self = Self::Number(number);
1079                res
1080            }
1081            Self::HashAndNumber { number: old_number, hash } => {
1082                let res = Some(*old_number);
1083                *self = Self::HashAndNumber { hash: *hash, number };
1084                res
1085            }
1086        }
1087    }
1088
1089    /// Return the hash of the target block, if it is set.
1090    const fn hash(&self) -> Option<B256> {
1091        match self {
1092            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1093            Self::Number(_) => None,
1094        }
1095    }
1096
1097    /// Return the block number of the sync target, if it is set.
1098    const fn number(&self) -> Option<u64> {
1099        match self {
1100            Self::Hash(_) => None,
1101            Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1102        }
1103    }
1104}
1105
1106/// The builder for [`ReverseHeadersDownloader`] with
1107/// some default settings
1108#[derive(Debug)]
1109pub struct ReverseHeadersDownloaderBuilder {
1110    /// The batch size per one request
1111    request_limit: u64,
1112    /// Batch size for headers
1113    stream_batch_size: usize,
1114    /// Batch size for headers
1115    min_concurrent_requests: usize,
1116    /// Batch size for headers
1117    max_concurrent_requests: usize,
1118    /// How many responses to buffer
1119    max_buffered_responses: usize,
1120}
1121
1122impl ReverseHeadersDownloaderBuilder {
1123    /// Creates a new [`ReverseHeadersDownloaderBuilder`] with configurations based on the provided
1124    /// [`HeadersConfig`].
1125    pub fn new(config: HeadersConfig) -> Self {
1126        Self::default()
1127            .request_limit(config.downloader_request_limit)
1128            .min_concurrent_requests(config.downloader_min_concurrent_requests)
1129            .max_concurrent_requests(config.downloader_max_concurrent_requests)
1130            .max_buffered_responses(config.downloader_max_buffered_responses)
1131            .stream_batch_size(config.commit_threshold as usize)
1132    }
1133}
1134
1135impl Default for ReverseHeadersDownloaderBuilder {
1136    fn default() -> Self {
1137        Self {
1138            stream_batch_size: 10_000,
1139            // This is just below the max number of headers commonly in a headers response (1024), see also <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L38-L40>
1140            // with ~500bytes per header this around 0.5MB per request max
1141            request_limit: 1_000,
1142            max_concurrent_requests: 100,
1143            min_concurrent_requests: 5,
1144            max_buffered_responses: 100,
1145        }
1146    }
1147}
1148
1149impl ReverseHeadersDownloaderBuilder {
1150    /// Set the request batch size.
1151    ///
1152    /// This determines the `limit` for a `GetBlockHeaders` requests, the number of headers we ask
1153    /// for.
1154    pub const fn request_limit(mut self, limit: u64) -> Self {
1155        self.request_limit = limit;
1156        self
1157    }
1158
1159    /// Set the stream batch size
1160    ///
1161    /// This determines the number of headers the [`ReverseHeadersDownloader`] will yield on
1162    /// `Stream::next`. This will be the amount of headers the headers stage will commit at a
1163    /// time.
1164    pub const fn stream_batch_size(mut self, size: usize) -> Self {
1165        self.stream_batch_size = size;
1166        self
1167    }
1168
1169    /// Set the min amount of concurrent requests.
1170    ///
1171    /// If there's capacity the [`ReverseHeadersDownloader`] will keep at least this many requests
1172    /// active at a time.
1173    pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1174        self.min_concurrent_requests = min_concurrent_requests;
1175        self
1176    }
1177
1178    /// Set the max amount of concurrent requests.
1179    ///
1180    /// The downloader's concurrent requests won't exceed the given amount.
1181    pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1182        self.max_concurrent_requests = max_concurrent_requests;
1183        self
1184    }
1185
1186    /// How many responses to buffer internally.
1187    ///
1188    /// This essentially determines how much memory the downloader can use for buffering responses
1189    /// that arrive out of order. The total number of buffered headers is `request_limit *
1190    /// max_buffered_responses`. If the [`ReverseHeadersDownloader`]'s buffered responses exceeds
1191    /// this threshold it waits until there's capacity again before sending new requests.
1192    pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1193        self.max_buffered_responses = max_buffered_responses;
1194        self
1195    }
1196
1197    /// Build [`ReverseHeadersDownloader`] with provided consensus
1198    /// and header client implementations
1199    pub fn build<H>(
1200        self,
1201        client: H,
1202        consensus: Arc<dyn HeaderValidator<H::Header>>,
1203    ) -> ReverseHeadersDownloader<H>
1204    where
1205        H: HeadersClient + 'static,
1206    {
1207        let Self {
1208            request_limit,
1209            stream_batch_size,
1210            min_concurrent_requests,
1211            max_concurrent_requests,
1212            max_buffered_responses,
1213        } = self;
1214        ReverseHeadersDownloader {
1215            consensus,
1216            client: Arc::new(client),
1217            local_head: None,
1218            sync_target: None,
1219            // Note: we set these to `0` first, they'll be updated once the sync target response is
1220            // handled and only used afterwards
1221            next_request_block_number: 0,
1222            next_chain_tip_block_number: 0,
1223            lowest_validated_header: None,
1224            request_limit,
1225            min_concurrent_requests,
1226            max_concurrent_requests,
1227            stream_batch_size,
1228            max_buffered_responses,
1229            sync_target_request: None,
1230            in_progress_queue: Default::default(),
1231            buffered_responses: Default::default(),
1232            queued_validated_headers: Default::default(),
1233            metrics: Default::default(),
1234        }
1235    }
1236}
1237
1238/// Configures and returns the next [`HeadersRequest`] based on the given parameters
1239///
1240/// The request will start at the given `next_request_block_number` block.
1241/// The `limit` of the request will either be the targeted `request_limit` or the difference of
1242/// `next_request_block_number` and the `local_head` in case this is smaller than the targeted
1243/// `request_limit`.
1244#[inline]
1245fn calc_next_request(
1246    local_head: u64,
1247    next_request_block_number: u64,
1248    request_limit: u64,
1249) -> HeadersRequest {
1250    // downloading is in reverse
1251    let diff = next_request_block_number - local_head;
1252    let limit = diff.min(request_limit);
1253    let start = next_request_block_number;
1254    HeadersRequest::falling(start.into(), limit)
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259    use super::*;
1260    use crate::headers::test_utils::child_header;
1261    use alloy_consensus::Header;
1262    use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1263    use assert_matches::assert_matches;
1264    use reth_consensus::test_utils::TestConsensus;
1265    use reth_network_p2p::test_utils::TestHeadersClient;
1266
1267    /// Tests that `replace_number` works the same way as `Option::replace`
1268    #[test]
1269    fn test_replace_number_semantics() {
1270        struct Fixture {
1271            // input fields (both SyncTargetBlock and Option<u64>)
1272            sync_target_block: SyncTargetBlock,
1273            sync_target_option: Option<u64>,
1274
1275            // option to replace
1276            replace_number: u64,
1277
1278            // expected method result
1279            expected_result: Option<u64>,
1280
1281            // output state
1282            new_number: u64,
1283        }
1284
1285        let fixtures = vec![
1286            Fixture {
1287                sync_target_block: SyncTargetBlock::Hash(B256::random()),
1288                // Hash maps to None here, all other variants map to Some
1289                sync_target_option: None,
1290                replace_number: 1,
1291                expected_result: None,
1292                new_number: 1,
1293            },
1294            Fixture {
1295                sync_target_block: SyncTargetBlock::Number(1),
1296                sync_target_option: Some(1),
1297                replace_number: 2,
1298                expected_result: Some(1),
1299                new_number: 2,
1300            },
1301            Fixture {
1302                sync_target_block: SyncTargetBlock::HashAndNumber {
1303                    hash: B256::random(),
1304                    number: 1,
1305                },
1306                sync_target_option: Some(1),
1307                replace_number: 2,
1308                expected_result: Some(1),
1309                new_number: 2,
1310            },
1311        ];
1312
1313        for fixture in fixtures {
1314            let mut sync_target_block = fixture.sync_target_block;
1315            let result = sync_target_block.replace_number(fixture.replace_number);
1316            assert_eq!(result, fixture.expected_result);
1317            assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1318
1319            let mut sync_target_option = fixture.sync_target_option;
1320            let option_result = sync_target_option.replace(fixture.replace_number);
1321            assert_eq!(option_result, fixture.expected_result);
1322            assert_eq!(sync_target_option, Some(fixture.new_number));
1323        }
1324    }
1325
1326    /// Tests that request calc works
1327    #[test]
1328    fn test_sync_target_update() {
1329        let client = Arc::new(TestHeadersClient::default());
1330
1331        let genesis = SealedHeader::default();
1332
1333        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1334            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1335        downloader.update_local_head(genesis);
1336        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1337
1338        downloader.sync_target_request.take();
1339
1340        let target = SyncTarget::Tip(B256::random());
1341        downloader.update_sync_target(target);
1342        assert!(downloader.sync_target_request.is_some());
1343
1344        downloader.sync_target_request.take();
1345        let target = SyncTarget::Gap(BlockWithParent {
1346            block: BlockNumHash::new(0, B256::random()),
1347            parent: Default::default(),
1348        });
1349        downloader.update_sync_target(target);
1350        assert!(downloader.sync_target_request.is_none());
1351        assert_matches!(
1352            downloader.sync_target,
1353            Some(target) => target.number().is_some()
1354        );
1355    }
1356
1357    /// Tests that request calc works
1358    #[test]
1359    fn test_head_update() {
1360        let client = Arc::new(TestHeadersClient::default());
1361
1362        let header: SealedHeader = SealedHeader::default();
1363
1364        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1365            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1366        downloader.update_local_head(header.clone());
1367        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1368
1369        downloader.queued_validated_headers.push(header.clone());
1370        let mut next = header.as_ref().clone();
1371        next.number += 1;
1372        downloader.update_local_head(SealedHeader::new(next, B256::random()));
1373        assert!(downloader.queued_validated_headers.is_empty());
1374    }
1375
1376    #[test]
1377    fn test_request_calc() {
1378        // request an entire batch
1379        let local = 0;
1380        let next = 1000;
1381        let batch_size = 2;
1382        let request = calc_next_request(local, next, batch_size);
1383        assert_eq!(request.start, next.into());
1384        assert_eq!(request.limit, batch_size);
1385
1386        // only request 1
1387        let local = 999;
1388        let next = 1000;
1389        let batch_size = 2;
1390        let request = calc_next_request(local, next, batch_size);
1391        assert_eq!(request.start, next.into());
1392        assert_eq!(request.limit, 1);
1393    }
1394
1395    /// Tests that request calc works
1396    #[test]
1397    fn test_next_request() {
1398        let client = Arc::new(TestHeadersClient::default());
1399
1400        let genesis = SealedHeader::default();
1401
1402        let batch_size = 99;
1403        let start = 1000;
1404        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1405            .request_limit(batch_size)
1406            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1407        downloader.update_local_head(genesis);
1408        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1409
1410        downloader.next_request_block_number = start;
1411
1412        let mut total = 0;
1413        while let Some(req) = downloader.next_request() {
1414            assert_eq!(req.start, (start - total).into());
1415            total += req.limit;
1416        }
1417        assert_eq!(total, start);
1418        assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1419    }
1420
1421    #[test]
1422    fn test_resp_order() {
1423        let mut heap = BinaryHeap::new();
1424        let hi = 1u64;
1425        heap.push(OrderedHeadersResponse::<Header> {
1426            headers: vec![],
1427            request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1428            peer_id: Default::default(),
1429        });
1430
1431        let lo = 0u64;
1432        heap.push(OrderedHeadersResponse {
1433            headers: vec![],
1434            request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1435            peer_id: Default::default(),
1436        });
1437
1438        assert_eq!(heap.pop().unwrap().block_number(), hi);
1439        assert_eq!(heap.pop().unwrap().block_number(), lo);
1440    }
1441
1442    #[tokio::test]
1443    async fn download_at_fork_head() {
1444        reth_tracing::init_test_tracing();
1445
1446        let client = Arc::new(TestHeadersClient::default());
1447
1448        let p3 = SealedHeader::default();
1449        let p2 = child_header(&p3);
1450        let p1 = child_header(&p2);
1451        let p0 = child_header(&p1);
1452
1453        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1454            .stream_batch_size(3)
1455            .request_limit(3)
1456            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1457        downloader.update_local_head(p3.clone());
1458        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1459
1460        client
1461            .extend(vec![
1462                p0.as_ref().clone(),
1463                p1.as_ref().clone(),
1464                p2.as_ref().clone(),
1465                p3.as_ref().clone(),
1466            ])
1467            .await;
1468
1469        let headers = downloader.next().await.unwrap();
1470        assert_eq!(headers, Ok(vec![p0, p1, p2,]));
1471        assert!(downloader.buffered_responses.is_empty());
1472        assert!(downloader.next().await.is_none());
1473        assert!(downloader.next().await.is_none());
1474    }
1475
1476    #[tokio::test]
1477    async fn download_one_by_one() {
1478        reth_tracing::init_test_tracing();
1479        let p3 = SealedHeader::default();
1480        let p2 = child_header(&p3);
1481        let p1 = child_header(&p2);
1482        let p0 = child_header(&p1);
1483
1484        let client = Arc::new(TestHeadersClient::default());
1485        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1486            .stream_batch_size(1)
1487            .request_limit(1)
1488            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1489        downloader.update_local_head(p3.clone());
1490        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1491
1492        client
1493            .extend(vec![
1494                p0.as_ref().clone(),
1495                p1.as_ref().clone(),
1496                p2.as_ref().clone(),
1497                p3.as_ref().clone(),
1498            ])
1499            .await;
1500
1501        let headers = downloader.next().await.unwrap();
1502        assert_eq!(headers, Ok(vec![p0]));
1503        let headers = headers.unwrap();
1504        assert_eq!(headers.capacity(), headers.len());
1505
1506        let headers = downloader.next().await.unwrap();
1507        assert_eq!(headers, Ok(vec![p1]));
1508        let headers = headers.unwrap();
1509        assert_eq!(headers.capacity(), headers.len());
1510
1511        let headers = downloader.next().await.unwrap();
1512        assert_eq!(headers, Ok(vec![p2]));
1513        let headers = headers.unwrap();
1514        assert_eq!(headers.capacity(), headers.len());
1515
1516        assert!(downloader.next().await.is_none());
1517    }
1518
1519    #[tokio::test]
1520    async fn download_one_by_one_larger_request_limit() {
1521        reth_tracing::init_test_tracing();
1522        let p3 = SealedHeader::default();
1523        let p2 = child_header(&p3);
1524        let p1 = child_header(&p2);
1525        let p0 = child_header(&p1);
1526
1527        let client = Arc::new(TestHeadersClient::default());
1528        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1529            .stream_batch_size(1)
1530            .request_limit(3)
1531            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1532        downloader.update_local_head(p3.clone());
1533        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1534
1535        client
1536            .extend(vec![
1537                p0.as_ref().clone(),
1538                p1.as_ref().clone(),
1539                p2.as_ref().clone(),
1540                p3.as_ref().clone(),
1541            ])
1542            .await;
1543
1544        let headers = downloader.next().await.unwrap();
1545        assert_eq!(headers, Ok(vec![p0]));
1546        let headers = headers.unwrap();
1547        assert_eq!(headers.capacity(), headers.len());
1548
1549        let headers = downloader.next().await.unwrap();
1550        assert_eq!(headers, Ok(vec![p1]));
1551        let headers = headers.unwrap();
1552        assert_eq!(headers.capacity(), headers.len());
1553
1554        let headers = downloader.next().await.unwrap();
1555        assert_eq!(headers, Ok(vec![p2]));
1556        let headers = headers.unwrap();
1557        assert_eq!(headers.capacity(), headers.len());
1558
1559        assert!(downloader.next().await.is_none());
1560    }
1561}