1use super::queue::BodiesRequestQueue;
2use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics};
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::Stream;
6use futures_util::StreamExt;
7use reth_config::BodiesConfig;
8use reth_consensus::{Consensus, ConsensusError};
9use reth_network_p2p::{
10 bodies::{
11 client::BodiesClient,
12 downloader::{BodyDownloader, BodyDownloaderResult},
13 response::BlockResponse,
14 },
15 error::{DownloadError, DownloadResult},
16};
17use reth_primitives_traits::{size::InMemorySize, Block, SealedHeader};
18use reth_storage_api::HeaderProvider;
19use reth_tasks::{TaskSpawner, TokioTaskExecutor};
20use std::{
21 cmp::Ordering,
22 collections::BinaryHeap,
23 fmt::Debug,
24 ops::RangeInclusive,
25 pin::Pin,
26 sync::Arc,
27 task::{Context, Poll},
28};
29use tracing::info;
30
31#[must_use = "Stream does nothing unless polled"]
35#[derive(Debug)]
36pub struct BodiesDownloader<
37 B: Block,
38 C: BodiesClient<Body = B::Body>,
39 Provider: HeaderProvider<Header = B::Header>,
40> {
41 client: Arc<C>,
43 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
45 provider: Provider,
47 request_limit: u64,
49 stream_batch_size: usize,
51 concurrent_requests_range: RangeInclusive<usize>,
53 max_buffered_blocks_size_bytes: usize,
55 buffered_blocks_size_bytes: usize,
57 download_range: RangeInclusive<BlockNumber>,
59 latest_queued_block_number: Option<BlockNumber>,
61 in_progress_queue: BodiesRequestQueue<B, C>,
63 buffered_responses: BinaryHeap<OrderedBodiesResponse<B>>,
65 queued_bodies: Vec<BlockResponse<B>>,
67 metrics: BodyDownloaderMetrics,
69}
70
71impl<B, C, Provider> BodiesDownloader<B, C, Provider>
72where
73 B: Block,
74 C: BodiesClient<Body = B::Body> + 'static,
75 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
76{
77 fn next_headers_request(&self) -> DownloadResult<Option<Vec<SealedHeader<Provider::Header>>>> {
79 let start_at = match self.in_progress_queue.last_requested_block_number {
80 Some(num) => num + 1,
81 None => *self.download_range.start(),
82 };
83 let items_left = (self.download_range.end() + 1).saturating_sub(start_at);
85 let limit = items_left.min(self.request_limit);
86 self.query_headers(start_at..=*self.download_range.end(), limit)
87 }
88
89 fn query_headers(
100 &self,
101 range: RangeInclusive<BlockNumber>,
102 max_non_empty: u64,
103 ) -> DownloadResult<Option<Vec<SealedHeader<B::Header>>>> {
104 if range.is_empty() || max_non_empty == 0 {
105 return Ok(None)
106 }
107
108 let mut collected = 0;
114 let mut non_empty_headers = 0;
115 let headers = self.provider.sealed_headers_while(range.clone(), |header| {
116 let should_take = range.contains(&header.number()) &&
117 non_empty_headers < max_non_empty &&
118 collected < self.stream_batch_size;
119
120 if should_take {
121 collected += 1;
122 if !header.is_empty() {
123 non_empty_headers += 1;
124 }
125 true
126 } else {
127 false
128 }
129 })?;
130
131 Ok(Some(headers).filter(|h| !h.is_empty()))
132 }
133
134 const fn next_expected_block_number(&self) -> BlockNumber {
136 match self.latest_queued_block_number {
137 Some(num) => num + 1,
138 None => *self.download_range.start(),
139 }
140 }
141
142 #[inline]
147 fn concurrent_request_limit(&self) -> usize {
148 let num_peers = self.client.num_connected_peers();
149
150 let max_requests = num_peers.max(*self.concurrent_requests_range.start());
151
152 if num_peers < *self.concurrent_requests_range.start() {
154 return max_requests
155 }
156
157 max_requests.min(*self.concurrent_requests_range.end())
158 }
159
160 const fn has_buffer_capacity(&self) -> bool {
162 self.buffered_blocks_size_bytes < self.max_buffered_blocks_size_bytes
163 }
164
165 fn is_terminated(&self) -> bool {
167 let nothing_to_request = self.download_range.is_empty() ||
169 self.in_progress_queue
171 .last_requested_block_number.is_some_and(|last| last == *self.download_range.end());
172
173 nothing_to_request &&
174 self.in_progress_queue.is_empty() &&
175 self.buffered_responses.is_empty() &&
176 self.queued_bodies.is_empty()
177 }
178
179 fn clear(&mut self) {
183 self.download_range = RangeInclusive::new(1, 0);
184 self.latest_queued_block_number.take();
185 self.in_progress_queue.clear();
186 self.queued_bodies = Vec::new();
187 self.buffered_responses = BinaryHeap::new();
188 self.buffered_blocks_size_bytes = 0;
189
190 self.metrics.in_flight_requests.set(0.);
192 self.metrics.buffered_responses.set(0.);
193 self.metrics.buffered_blocks.set(0.);
194 self.metrics.buffered_blocks_size_bytes.set(0.);
195 self.metrics.queued_blocks.set(0.);
196 }
197
198 fn queue_bodies(&mut self, bodies: Vec<BlockResponse<B>>) {
200 self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number());
201 self.queued_bodies.extend(bodies);
202 self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
203 }
204
205 fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse<B>> {
207 let resp = self.buffered_responses.pop()?;
208 self.metrics.buffered_responses.decrement(1.);
209 self.buffered_blocks_size_bytes -= resp.size();
210 self.metrics.buffered_blocks.decrement(resp.len() as f64);
211 self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
212 Some(resp)
213 }
214
215 fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<B>>) {
217 let size = response.iter().map(BlockResponse::size).sum::<usize>();
218
219 let response = OrderedBodiesResponse { resp: response, size };
220 let response_len = response.len();
221
222 self.buffered_blocks_size_bytes += size;
223 self.buffered_responses.push(response);
224
225 self.metrics.buffered_blocks.increment(response_len as f64);
226 self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
227 self.metrics.buffered_responses.set(self.buffered_responses.len() as f64);
228 }
229
230 fn try_next_buffered(&mut self) -> Option<Vec<BlockResponse<B>>> {
232 if let Some(next) = self.buffered_responses.peek() {
233 let expected = self.next_expected_block_number();
234 let next_block_range = next.block_range();
235
236 if next_block_range.contains(&expected) {
237 return self.pop_buffered_response().map(|buffered| {
238 buffered
239 .resp
240 .into_iter()
241 .skip_while(|b| b.block_number() < expected)
242 .take_while(|b| self.download_range.contains(&b.block_number()))
243 .collect()
244 })
245 }
246
247 if *next_block_range.end() < expected {
249 self.pop_buffered_response();
250 }
251 }
252 None
253 }
254
255 fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse<B>>> {
258 if self.queued_bodies.len() >= self.stream_batch_size {
259 let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
260 self.queued_bodies.shrink_to_fit();
261 self.metrics.total_flushed.increment(next_batch.len() as u64);
262 self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
263 return Some(next_batch)
264 }
265 None
266 }
267
268 fn can_submit_new_request(&self) -> bool {
273 self.queued_bodies.len() < 4 * self.stream_batch_size &&
277 self.has_buffer_capacity() &&
278 self.in_progress_queue.len() < self.concurrent_request_limit()
279 }
280}
281
282impl<B, C, Provider> BodiesDownloader<B, C, Provider>
283where
284 B: Block + 'static,
285 C: BodiesClient<Body = B::Body> + 'static,
286 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
287{
288 pub fn into_task(self) -> TaskDownloader<B> {
290 self.into_task_with(&TokioTaskExecutor::default())
291 }
292
293 pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader<B>
295 where
296 S: TaskSpawner,
297 {
298 TaskDownloader::spawn_with(self, spawner)
299 }
300}
301
302impl<B, C, Provider> BodyDownloader for BodiesDownloader<B, C, Provider>
303where
304 B: Block + 'static,
305 C: BodiesClient<Body = B::Body> + 'static,
306 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
307{
308 type Block = B;
309
310 fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
317 if range.is_empty() {
319 tracing::error!(target: "downloaders::bodies", ?range, "Bodies download range is invalid (empty)");
320 return Err(DownloadError::InvalidBodyRange { range })
321 }
322
323 let is_current_range_subset = self.download_range.contains(range.start()) &&
325 *range.end() == *self.download_range.end();
326 if is_current_range_subset {
327 tracing::trace!(target: "downloaders::bodies", ?range, "Download range already in progress");
328 return Ok(())
330 }
331
332 let count = *range.end() - *range.start() + 1; let is_next_consecutive_range = *range.start() == *self.download_range.end() + 1;
335 if is_next_consecutive_range {
336 tracing::trace!(target: "downloaders::bodies", ?range, "New download range set");
338 info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
339 self.download_range = range;
340 return Ok(())
341 }
342
343 tracing::trace!(target: "downloaders::bodies", ?range, prev_range = ?self.download_range, "Download range reset");
346 info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
347 if let Some(last_returned) = self.latest_queued_block_number &&
349 *range.start() < last_returned
350 {
351 self.metrics.out_of_order_requests.increment(1);
352 }
353 self.clear();
354 self.download_range = range;
355 Ok(())
356 }
357}
358
359impl<B, C, Provider> Stream for BodiesDownloader<B, C, Provider>
360where
361 B: Block + 'static,
362 C: BodiesClient<Body = B::Body> + 'static,
363 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
364{
365 type Item = BodyDownloaderResult<B>;
366
367 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
368 let this = self.get_mut();
369 if this.is_terminated() {
370 return Poll::Ready(None)
371 }
372 loop {
374 if let Some(next_batch) = this.try_split_next_batch() {
376 return Poll::Ready(Some(Ok(next_batch)))
377 }
378
379 while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
381 this.metrics.in_flight_requests.decrement(1.);
382 match response {
383 Ok(response) => {
384 this.buffer_bodies_response(response);
385 }
386 Err(error) => {
387 tracing::debug!(target: "downloaders::bodies", %error, "Request failed");
388 this.clear();
389 return Poll::Ready(Some(Err(error)))
390 }
391 };
392 }
393
394 let mut new_request_submitted = false;
396 'inner: while this.can_submit_new_request() {
398 match this.next_headers_request() {
399 Ok(Some(request)) => {
400 this.metrics.in_flight_requests.increment(1.);
401 this.in_progress_queue.push_new_request(
402 Arc::clone(&this.client),
403 Arc::clone(&this.consensus),
404 request,
405 );
406 new_request_submitted = true;
407 }
408 Ok(None) => break 'inner,
409 Err(error) => {
410 tracing::error!(target: "downloaders::bodies", %error, "Failed to download from next request");
411 this.clear();
412 return Poll::Ready(Some(Err(error)))
413 }
414 };
415 }
416
417 while let Some(buf_response) = this.try_next_buffered() {
418 this.queue_bodies(buf_response);
419 }
420
421 this.buffered_responses.shrink_to_fit();
423
424 if !new_request_submitted {
425 break
426 }
427 }
428
429 if this.in_progress_queue.is_empty() {
431 if this.queued_bodies.is_empty() {
432 return Poll::Ready(None)
433 }
434 let batch_size = this.stream_batch_size.min(this.queued_bodies.len());
435 let next_batch = this.queued_bodies.drain(..batch_size).collect::<Vec<_>>();
436 this.queued_bodies.shrink_to_fit();
437 this.metrics.total_flushed.increment(next_batch.len() as u64);
438 this.metrics.queued_blocks.set(this.queued_bodies.len() as f64);
439 return Poll::Ready(Some(Ok(next_batch)))
440 }
441
442 Poll::Pending
443 }
444}
445
446#[derive(Debug)]
447struct OrderedBodiesResponse<B: Block> {
448 resp: Vec<BlockResponse<B>>,
449 size: usize,
451}
452
453impl<B: Block> OrderedBodiesResponse<B> {
454 #[inline]
455 const fn len(&self) -> usize {
456 self.resp.len()
457 }
458
459 #[inline]
463 const fn size(&self) -> usize {
464 self.size
465 }
466}
467
468impl<B: Block> OrderedBodiesResponse<B> {
469 fn first_block_number(&self) -> u64 {
474 self.resp.first().expect("is not empty").block_number()
475 }
476
477 fn block_range(&self) -> RangeInclusive<u64> {
482 self.first_block_number()..=self.resp.last().expect("is not empty").block_number()
483 }
484}
485
486impl<B: Block> PartialEq for OrderedBodiesResponse<B> {
487 fn eq(&self, other: &Self) -> bool {
488 self.first_block_number() == other.first_block_number()
489 }
490}
491
492impl<B: Block> Eq for OrderedBodiesResponse<B> {}
493
494impl<B: Block> PartialOrd for OrderedBodiesResponse<B> {
495 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
496 Some(self.cmp(other))
497 }
498}
499
500impl<B: Block> Ord for OrderedBodiesResponse<B> {
501 fn cmp(&self, other: &Self) -> Ordering {
502 self.first_block_number().cmp(&other.first_block_number()).reverse()
503 }
504}
505
506#[derive(Debug, Clone)]
508pub struct BodiesDownloaderBuilder {
509 pub request_limit: u64,
511 pub stream_batch_size: usize,
513 pub max_buffered_blocks_size_bytes: usize,
515 pub concurrent_requests_range: RangeInclusive<usize>,
517}
518
519impl BodiesDownloaderBuilder {
520 pub fn new(config: BodiesConfig) -> Self {
523 Self::default()
524 .with_stream_batch_size(config.downloader_stream_batch_size)
525 .with_request_limit(config.downloader_request_limit)
526 .with_max_buffered_blocks_size_bytes(config.downloader_max_buffered_blocks_size_bytes)
527 .with_concurrent_requests_range(
528 config.downloader_min_concurrent_requests..=
529 config.downloader_max_concurrent_requests,
530 )
531 }
532}
533
534impl Default for BodiesDownloaderBuilder {
535 fn default() -> Self {
536 Self {
537 request_limit: 200,
538 stream_batch_size: 1_000,
539 max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, concurrent_requests_range: 5..=100,
541 }
542 }
543}
544
545impl BodiesDownloaderBuilder {
546 pub const fn with_request_limit(mut self, request_limit: u64) -> Self {
548 self.request_limit = request_limit;
549 self
550 }
551
552 pub const fn with_stream_batch_size(mut self, stream_batch_size: usize) -> Self {
554 self.stream_batch_size = stream_batch_size;
555 self
556 }
557
558 pub const fn with_concurrent_requests_range(
560 mut self,
561 concurrent_requests_range: RangeInclusive<usize>,
562 ) -> Self {
563 self.concurrent_requests_range = concurrent_requests_range;
564 self
565 }
566
567 pub const fn with_max_buffered_blocks_size_bytes(
569 mut self,
570 max_buffered_blocks_size_bytes: usize,
571 ) -> Self {
572 self.max_buffered_blocks_size_bytes = max_buffered_blocks_size_bytes;
573 self
574 }
575
576 pub fn build<B, C, Provider>(
578 self,
579 client: C,
580 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
581 provider: Provider,
582 ) -> BodiesDownloader<B, C, Provider>
583 where
584 B: Block,
585 C: BodiesClient<Body = B::Body> + 'static,
586 Provider: HeaderProvider<Header = B::Header>,
587 {
588 let Self {
589 request_limit,
590 stream_batch_size,
591 concurrent_requests_range,
592 max_buffered_blocks_size_bytes,
593 } = self;
594 let metrics = BodyDownloaderMetrics::default();
595 let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
596 BodiesDownloader {
597 client: Arc::new(client),
598 consensus,
599 provider,
600 request_limit,
601 stream_batch_size,
602 max_buffered_blocks_size_bytes,
603 concurrent_requests_range,
604 in_progress_queue,
605 metrics,
606 download_range: RangeInclusive::new(1, 0),
607 latest_queued_block_number: None,
608 buffered_responses: Default::default(),
609 queued_bodies: Default::default(),
610 buffered_blocks_size_bytes: 0,
611 }
612 }
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618 use crate::{
619 bodies::test_utils::{insert_headers, zip_blocks},
620 test_utils::{generate_bodies, TestBodiesClient},
621 };
622 use alloy_primitives::B256;
623 use assert_matches::assert_matches;
624 use reth_consensus::test_utils::TestConsensus;
625 use reth_provider::test_utils::create_test_provider_factory;
626 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
627 use std::collections::HashMap;
628
629 #[tokio::test]
632 async fn streams_bodies_in_order() {
633 let factory = create_test_provider_factory();
635 let (headers, mut bodies) = generate_bodies(0..=19);
636
637 insert_headers(&factory, &headers);
638
639 let client = Arc::new(
640 TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
641 );
642
643 let mut downloader = BodiesDownloaderBuilder::default()
644 .build::<reth_ethereum_primitives::Block, _, _>(
645 client.clone(),
646 Arc::new(TestConsensus::default()),
647 factory,
648 );
649 downloader.set_download_range(0..=19).expect("failed to set download range");
650
651 assert_matches!(
652 downloader.next().await,
653 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
654 );
655 assert_eq!(client.times_requested(), 1);
656 }
657
658 #[tokio::test]
661 async fn requests_correct_number_of_times() {
662 let factory = create_test_provider_factory();
664 let mut rng = generators::rng();
665 let blocks = random_block_range(
666 &mut rng,
667 0..=199,
668 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..2, ..Default::default() },
669 );
670
671 let headers = blocks.iter().map(|block| block.clone_sealed_header()).collect::<Vec<_>>();
672 let bodies = blocks
673 .into_iter()
674 .map(|block| (block.hash(), block.into_body()))
675 .collect::<HashMap<_, _>>();
676
677 insert_headers(&factory, &headers);
678
679 let request_limit = 10;
680 let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
681
682 let mut downloader = BodiesDownloaderBuilder::default()
683 .with_request_limit(request_limit)
684 .build::<reth_ethereum_primitives::Block, _, _>(
685 client.clone(),
686 Arc::new(TestConsensus::default()),
687 factory,
688 );
689 downloader.set_download_range(0..=199).expect("failed to set download range");
690
691 let _ = downloader.collect::<Vec<_>>().await;
692 assert_eq!(client.times_requested(), 20);
693 }
694
695 #[tokio::test]
698 async fn streams_bodies_in_order_after_range_reset() {
699 let factory = create_test_provider_factory();
701 let (headers, mut bodies) = generate_bodies(0..=99);
702
703 insert_headers(&factory, &headers);
704
705 let stream_batch_size = 20;
706 let request_limit = 10;
707 let client = Arc::new(
708 TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
709 );
710 let mut downloader = BodiesDownloaderBuilder::default()
711 .with_stream_batch_size(stream_batch_size)
712 .with_request_limit(request_limit)
713 .build::<reth_ethereum_primitives::Block, _, _>(
714 client.clone(),
715 Arc::new(TestConsensus::default()),
716 factory,
717 );
718
719 let mut range_start = 0;
720 while range_start < 100 {
721 downloader.set_download_range(range_start..=99).expect("failed to set download range");
722
723 assert_matches!(
724 downloader.next().await,
725 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(range_start as usize).take(stream_batch_size), &mut bodies))
726 );
727 assert!(downloader.latest_queued_block_number >= Some(range_start));
728 range_start += stream_batch_size as u64;
729 }
730 }
731
732 #[tokio::test]
735 async fn can_download_new_range_after_termination() {
736 let factory = create_test_provider_factory();
738 let (headers, mut bodies) = generate_bodies(0..=199);
739
740 insert_headers(&factory, &headers);
741
742 let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
743
744 let mut downloader = BodiesDownloaderBuilder::default()
745 .with_stream_batch_size(100)
746 .build::<reth_ethereum_primitives::Block, _, _>(
747 client.clone(),
748 Arc::new(TestConsensus::default()),
749 factory,
750 );
751
752 downloader.set_download_range(0..=99).expect("failed to set download range");
754 assert_matches!(
755 downloader.next().await,
756 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
757 );
758
759 assert!(downloader.next().await.is_none());
761
762 downloader.set_download_range(100..=199).expect("failed to set download range");
764 assert_matches!(
765 downloader.next().await,
766 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(100), &mut bodies))
767 );
768 }
769
770 #[tokio::test]
772 async fn can_download_after_exceeding_limit() {
773 let factory = create_test_provider_factory();
775 let (headers, mut bodies) = generate_bodies(0..=199);
776
777 insert_headers(&factory, &headers);
778
779 let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
780
781 let mut downloader = BodiesDownloaderBuilder::default()
784 .with_stream_batch_size(10)
785 .with_request_limit(1)
786 .with_max_buffered_blocks_size_bytes(1)
787 .build::<reth_ethereum_primitives::Block, _, _>(
788 client.clone(),
789 Arc::new(TestConsensus::default()),
790 factory,
791 );
792
793 downloader.set_download_range(0..=199).expect("failed to set download range");
795 let mut header = 0;
796 while let Some(Ok(resp)) = downloader.next().await {
797 assert_eq!(resp, zip_blocks(headers.iter().skip(header).take(resp.len()), &mut bodies));
798 header += resp.len();
799 }
800 }
801
802 #[tokio::test]
804 async fn can_tolerate_empty_responses() {
805 let factory = create_test_provider_factory();
807 let (headers, mut bodies) = generate_bodies(0..=99);
808
809 insert_headers(&factory, &headers);
810
811 let client = Arc::new(
813 TestBodiesClient::default().with_bodies(bodies.clone()).with_empty_responses(2),
814 );
815
816 let mut downloader = BodiesDownloaderBuilder::default()
817 .with_request_limit(3)
818 .with_stream_batch_size(100)
819 .build::<reth_ethereum_primitives::Block, _, _>(
820 client.clone(),
821 Arc::new(TestConsensus::default()),
822 factory,
823 );
824
825 downloader.set_download_range(0..=99).expect("failed to set download range");
827 assert_matches!(
828 downloader.next().await,
829 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
830 );
831 }
832}