1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
use std::{collections::HashMap, io, path::Path};

use alloy_primitives::{BlockHash, BlockNumber, B256};
use futures::Future;
use itertools::Either;
use reth_network_p2p::{
    bodies::client::{BodiesClient, BodiesFut},
    download::DownloadClient,
    error::RequestError,
    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
    priority::Priority,
};
use reth_network_peers::PeerId;
use reth_primitives::{BlockBody, BlockHashOrNumber, Header, SealedHeader};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::{debug, trace, warn};

use crate::receipt_file_client::FromReceiptReader;

use super::file_codec::BlockFileCodec;

/// Default byte length of chunk to read from chain file.
///
/// Default is 1 GB.
pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;

/// Front-end API for fetching chain data from a file.
///
/// Blocks are assumed to be written one after another in a file, as rlp bytes.
///
/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
/// rlp(block1) || rlp(block2) || rlp(block3)
///
/// Blocks are assumed to have populated transactions, so reading headers will also buffer
/// transactions in memory for use in the bodies stage.
///
/// This reads the entire file into memory, so it is not suitable for large files.
#[derive(Debug)]
pub struct FileClient {
    /// The buffered headers retrieved when fetching new bodies.
    headers: HashMap<BlockNumber, Header>,

    /// A mapping between block hash and number.
    hash_to_number: HashMap<BlockHash, BlockNumber>,

    /// The buffered bodies retrieved when fetching new headers.
    bodies: HashMap<BlockHash, BlockBody>,
}

/// An error that can occur when constructing and using a [`FileClient`].
#[derive(Debug, Error)]
pub enum FileClientError {
    /// An error occurred when opening or reading the file.
    #[error(transparent)]
    Io(#[from] std::io::Error),

    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
    #[error("{0}")]
    Rlp(alloy_rlp::Error, Vec<u8>),

    /// Custom error message.
    #[error("{0}")]
    Custom(&'static str),
}

impl From<&'static str> for FileClientError {
    fn from(value: &'static str) -> Self {
        Self::Custom(value)
    }
}

impl FileClient {
    /// Create a new file client from a file path.
    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
        let file = File::open(path).await?;
        Self::from_file(file).await
    }

    /// Initialize the [`FileClient`] with a file directly.
    pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
        // get file len from metadata before reading
        let metadata = file.metadata().await?;
        let file_len = metadata.len();

        let mut reader = vec![];
        file.read_to_end(&mut reader).await?;

        Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
    }

    /// Get the tip hash of the chain.
    pub fn tip(&self) -> Option<B256> {
        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
    }

    /// Get the start hash of the chain.
    pub fn start(&self) -> Option<B256> {
        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
    }

    /// Returns the highest block number of this client has or `None` if empty
    pub fn max_block(&self) -> Option<u64> {
        self.headers.keys().max().copied()
    }

    /// Returns the lowest block number of this client has or `None` if empty
    pub fn min_block(&self) -> Option<u64> {
        self.headers.keys().min().copied()
    }

    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
    /// before returning.
    pub fn tip_header(&self) -> Option<SealedHeader> {
        self.headers.get(&self.max_block()?).map(|h| h.clone().seal_slow())
    }

    /// Returns true if all blocks are canonical (no gaps)
    pub fn has_canonical_blocks(&self) -> bool {
        if self.headers.is_empty() {
            return true
        }
        let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
        nums.sort_unstable();
        let mut iter = nums.into_iter();
        let mut lowest = iter.next().expect("not empty");
        for next in iter {
            if next != lowest + 1 {
                return false
            }
            lowest = next;
        }
        true
    }

    /// Use the provided bodies as the file client's block body buffer.
    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, BlockBody>) -> Self {
        self.bodies = bodies;
        self
    }

    /// Use the provided headers as the file client's block body buffer.
    pub fn with_headers(mut self, headers: HashMap<BlockNumber, Header>) -> Self {
        self.headers = headers;
        for (number, header) in &self.headers {
            self.hash_to_number.insert(header.hash_slow(), *number);
        }
        self
    }

    /// Returns the current number of headers in the client.
    pub fn headers_len(&self) -> usize {
        self.headers.len()
    }

    /// Returns the current number of bodies in the client.
    pub fn bodies_len(&self) -> usize {
        self.bodies.len()
    }

    /// Returns an iterator over headers in the client.
    pub fn headers_iter(&self) -> impl Iterator<Item = &Header> {
        self.headers.values()
    }

    /// Returns a mutable iterator over bodies in the client.
    ///
    /// Panics, if file client headers and bodies are not mapping 1-1.
    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut BlockBody)> {
        let bodies = &mut self.bodies;
        let numbers = &self.hash_to_number;
        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
    }

    /// Returns the current number of transactions in the client.
    pub fn total_transactions(&self) -> usize {
        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions.len())
    }
}

impl FromReader for FileClient {
    type Error = FileClientError;

    /// Initialize the [`FileClient`] from bytes that have been read from file.
    fn from_reader<B>(
        reader: B,
        num_bytes: u64,
    ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
    where
        B: AsyncReadExt + Unpin,
    {
        let mut headers = HashMap::new();
        let mut hash_to_number = HashMap::new();
        let mut bodies = HashMap::new();

        // use with_capacity to make sure the internal buffer contains the entire chunk
        let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);

        trace!(target: "downloaders::file",
            target_num_bytes=num_bytes,
            capacity=stream.read_buffer().capacity(),
            "init decode stream"
        );

        let mut remaining_bytes = vec![];

        let mut log_interval = 0;
        let mut log_interval_start_block = 0;

        async move {
            while let Some(block_res) = stream.next().await {
                let block = match block_res {
                    Ok(block) => block,
                    Err(FileClientError::Rlp(err, bytes)) => {
                        trace!(target: "downloaders::file",
                            %err,
                            bytes_len=bytes.len(),
                            "partial block returned from decoding chunk"
                        );
                        remaining_bytes = bytes;
                        break
                    }
                    Err(err) => return Err(err),
                };
                let block_number = block.header.number;
                let block_hash = block.header.hash_slow();

                // add to the internal maps
                headers.insert(block.header.number, block.header.clone());
                hash_to_number.insert(block_hash, block.header.number);
                bodies.insert(block_hash, block.into());

                if log_interval == 0 {
                    trace!(target: "downloaders::file",
                        block_number,
                        "read first block"
                    );
                    log_interval_start_block = block_number;
                } else if log_interval % 100_000 == 0 {
                    trace!(target: "downloaders::file",
                        blocks=?log_interval_start_block..=block_number,
                        "read blocks from file"
                    );
                    log_interval_start_block = block_number + 1;
                }
                log_interval += 1;
            }

            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");

            Ok(DecodedFileChunk {
                file_client: Self { headers, hash_to_number, bodies },
                remaining_bytes,
                highest_block: None,
            })
        }
    }
}

impl HeadersClient for FileClient {
    type Output = HeadersFut;

    fn get_headers_with_priority(
        &self,
        request: HeadersRequest,
        _priority: Priority,
    ) -> Self::Output {
        // this just searches the buffer, and fails if it can't find the header
        let mut headers = Vec::new();
        trace!(target: "downloaders::file", request=?request, "Getting headers");

        let start_num = match request.start {
            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
                Some(num) => *num,
                None => {
                    warn!(%hash, "Could not find starting block number for requested header hash");
                    return Box::pin(async move { Err(RequestError::BadResponse) })
                }
            },
            BlockHashOrNumber::Number(num) => num,
        };

        let range = if request.limit == 1 {
            Either::Left(start_num..start_num + 1)
        } else {
            match request.direction {
                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
                HeadersDirection::Falling => {
                    Either::Right((start_num - request.limit + 1..=start_num).rev())
                }
            }
        };

        trace!(target: "downloaders::file", range=?range, "Getting headers with range");

        for block_number in range {
            match self.headers.get(&block_number).cloned() {
                Some(header) => headers.push(header),
                None => {
                    warn!(number=%block_number, "Could not find header");
                    return Box::pin(async move { Err(RequestError::BadResponse) })
                }
            }
        }

        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
    }
}

impl BodiesClient for FileClient {
    type Output = BodiesFut;

    fn get_block_bodies_with_priority(
        &self,
        hashes: Vec<B256>,
        _priority: Priority,
    ) -> Self::Output {
        // this just searches the buffer, and fails if it can't find the block
        let mut bodies = Vec::new();

        // check if any are an error
        // could unwrap here
        for hash in hashes {
            match self.bodies.get(&hash).cloned() {
                Some(body) => bodies.push(body),
                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
            }
        }

        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
    }
}

impl DownloadClient for FileClient {
    fn report_bad_message(&self, _peer_id: PeerId) {
        warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
        // noop
    }

    fn num_connected_peers(&self) -> usize {
        // no such thing as connected peers when we are just using a file
        1
    }
}

/// Chunks file into several [`FileClient`]s.
#[derive(Debug)]
pub struct ChunkedFileReader {
    /// File to read from.
    file: File,
    /// Current file byte length.
    file_byte_len: u64,
    /// Bytes that have been read.
    chunk: Vec<u8>,
    /// Max bytes per chunk.
    chunk_byte_len: u64,
    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
    /// with block number
    highest_block: Option<u64>,
}

impl ChunkedFileReader {
    /// Returns the remaining file length.
    pub const fn file_len(&self) -> u64 {
        self.file_byte_len
    }

    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
    pub async fn new<P: AsRef<Path>>(
        path: P,
        chunk_byte_len: Option<u64>,
    ) -> Result<Self, FileClientError> {
        let file = File::open(path).await?;
        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);

        Self::from_file(file, chunk_byte_len).await
    }

    /// Opens the file to import from given path. Returns a new instance.
    pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
        // get file len from metadata before reading
        let metadata = file.metadata().await?;
        let file_byte_len = metadata.len();

        Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
    }

    /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
    /// length and the remaining file length.
    fn chunk_len(&self) -> u64 {
        let Self { chunk_byte_len, file_byte_len, .. } = *self;
        let file_byte_len = file_byte_len + self.chunk.len() as u64;

        if chunk_byte_len > file_byte_len {
            // last chunk
            file_byte_len
        } else {
            chunk_byte_len
        }
    }

    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
    /// chunk to read.
    async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
        if self.file_byte_len == 0 && self.chunk.is_empty() {
            // eof
            return Ok(None)
        }

        let chunk_target_len = self.chunk_len();
        let old_bytes_len = self.chunk.len() as u64;

        // calculate reserved space in chunk
        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;

        // read new bytes from file
        let prev_read_bytes_len = self.chunk.len();
        self.chunk.extend(std::iter::repeat(0).take(new_read_bytes_target_len as usize));
        let reader = &mut self.chunk[prev_read_bytes_len..];

        // actual bytes that have been read
        let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
        let next_chunk_byte_len = self.chunk.len();

        // update remaining file length
        self.file_byte_len -= new_read_bytes_len;

        debug!(target: "downloaders::file",
            max_chunk_byte_len=self.chunk_byte_len,
            prev_read_bytes_len,
            new_read_bytes_target_len,
            new_read_bytes_len,
            next_chunk_byte_len,
            remaining_file_byte_len=self.file_byte_len,
            "new bytes were read from file"
        );

        Ok(Some(next_chunk_byte_len as u64))
    }

    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
    pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
    where
        T: FromReader,
    {
        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };

        // make new file client from chunk
        let DecodedFileChunk { file_client, remaining_bytes, .. } =
            T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;

        // save left over bytes
        self.chunk = remaining_bytes;

        Ok(Some(file_client))
    }

    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
    pub async fn next_receipts_chunk<T, D>(&mut self) -> Result<Option<T>, T::Error>
    where
        T: FromReceiptReader<D>,
    {
        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };

        // make new file client from chunk
        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
                .await?;

        // save left over bytes
        self.chunk = remaining_bytes;
        // update highest block
        self.highest_block = highest_block;

        Ok(Some(file_client))
    }
}

/// Constructs a file client from a reader.
pub trait FromReader {
    /// Error returned by file client type.
    type Error: From<io::Error>;

    /// Returns a file client
    fn from_reader<B>(
        reader: B,
        num_bytes: u64,
    ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
    where
        Self: Sized,
        B: AsyncReadExt + Unpin;
}

/// Output from decoding a file chunk with [`FromReader::from_reader`].
#[derive(Debug)]
pub struct DecodedFileChunk<T> {
    /// File client, i.e. the decoded part of chunk.
    pub file_client: T,
    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
    pub remaining_bytes: Vec<u8>,
    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
    /// block number, like receipts.
    pub highest_block: Option<u64>,
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        bodies::{
            bodies::BodiesDownloaderBuilder,
            test_utils::{insert_headers, zip_blocks},
        },
        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
        test_utils::{generate_bodies, generate_bodies_file},
    };
    use assert_matches::assert_matches;
    use futures_util::stream::StreamExt;
    use rand::Rng;
    use reth_consensus::test_utils::TestConsensus;
    use reth_network_p2p::{
        bodies::downloader::BodyDownloader,
        headers::downloader::{HeaderDownloader, SyncTarget},
    };
    use reth_provider::test_utils::create_test_provider_factory;
    use std::sync::Arc;

    #[tokio::test]
    async fn streams_bodies_from_buffer() {
        // Generate some random blocks
        let factory = create_test_provider_factory();
        let (headers, mut bodies) = generate_bodies(0..=19);

        insert_headers(factory.db_ref().db(), &headers);

        // create an empty file
        let file = tempfile::tempfile().unwrap();

        let client =
            Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
        let mut downloader = BodiesDownloaderBuilder::default().build(
            client.clone(),
            Arc::new(TestConsensus::default()),
            factory,
        );
        downloader.set_download_range(0..=19).expect("failed to set download range");

        assert_matches!(
            downloader.next().await,
            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
        );
    }

    #[tokio::test]
    async fn download_headers_at_fork_head() {
        reth_tracing::init_test_tracing();

        let p3 = SealedHeader::default();
        let p2 = child_header(&p3);
        let p1 = child_header(&p2);
        let p0 = child_header(&p1);

        let file = tempfile::tempfile().unwrap();
        let client = Arc::new(FileClient::from_file(file.into()).await.unwrap().with_headers(
            HashMap::from([
                (0u64, p0.clone().unseal()),
                (1, p1.clone().unseal()),
                (2, p2.clone().unseal()),
                (3, p3.clone().unseal()),
            ]),
        ));

        let mut downloader = ReverseHeadersDownloaderBuilder::default()
            .stream_batch_size(3)
            .request_limit(3)
            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
        downloader.update_local_head(p3.clone());
        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));

        let headers = downloader.next().await.unwrap();
        assert_eq!(headers, Ok(vec![p0, p1, p2]));
        assert!(downloader.next().await.is_none());
        assert!(downloader.next().await.is_none());
    }

    #[tokio::test]
    async fn test_download_headers_from_file() {
        reth_tracing::init_test_tracing();

        // Generate some random blocks
        let (file, headers, _) = generate_bodies_file(0..=19).await;
        // now try to read them back
        let client = Arc::new(FileClient::from_file(file).await.unwrap());

        // construct headers downloader and use first header
        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
        header_downloader.update_local_head(headers.first().unwrap().clone());
        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));

        // get headers first
        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();

        // reverse to make sure it's in the right order before comparing
        downloaded_headers.reverse();

        // the first header is not included in the response
        assert_eq!(downloaded_headers, headers[1..]);
    }

    #[tokio::test]
    async fn test_download_bodies_from_file() {
        // Generate some random blocks
        let factory = create_test_provider_factory();
        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;

        // now try to read them back
        let client = Arc::new(FileClient::from_file(file).await.unwrap());

        // insert headers in db for the bodies downloader
        insert_headers(factory.db_ref().db(), &headers);

        let mut downloader = BodiesDownloaderBuilder::default().build(
            client.clone(),
            Arc::new(TestConsensus::default()),
            factory,
        );
        downloader.set_download_range(0..=19).expect("failed to set download range");

        assert_matches!(
            downloader.next().await,
            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
        );
    }

    #[tokio::test]
    async fn test_chunk_download_headers_from_file() {
        reth_tracing::init_test_tracing();

        // Generate some random blocks
        let (file, headers, _) = generate_bodies_file(0..=14).await;

        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
        // one block will be read
        let chunk_byte_len = rand::thread_rng().gen_range(2000..=10_000);
        trace!(target: "downloaders::file::test", chunk_byte_len);

        // init reader
        let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();

        let mut downloaded_headers: Vec<SealedHeader> = vec![];

        let mut local_header = headers.first().unwrap().clone();

        // test
        while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
            let sync_target = client.tip_header().unwrap();

            let sync_target_hash = sync_target.hash();

            // construct headers downloader and use first header
            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
                .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
            header_downloader.update_local_head(local_header.clone());
            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));

            // get headers first
            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();

            // export new local header to outer scope
            local_header = sync_target;

            // reverse to make sure it's in the right order before comparing
            downloaded_headers_chunk.reverse();
            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
        }

        // the first header is not included in the response
        assert_eq!(headers[1..], downloaded_headers);
    }
}