1use super::headers::client::HeadersRequest;
2use crate::{
3 block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4 bodies::client::{BodiesClient, SingleBodyRequest},
5 download::DownloadClient,
6 error::PeerRequestResult,
7 headers::client::{HeadersClient, SingleHeaderRequest},
8 priority::Priority,
9 BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_eip7928::bal::RawBal;
13use alloy_primitives::{Bytes, Sealable, B256};
14use core::marker::PhantomData;
15use futures::FutureExt;
16use reth_consensus::Consensus;
17use reth_eth_wire_types::{
18 BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
19};
20use reth_network_peers::{PeerId, WithPeerId};
21use reth_primitives_traits::{Block, SealedBlock, SealedBlockWith, SealedHeader};
22use std::{
23 cmp::Reverse,
24 collections::{HashMap, VecDeque},
25 fmt::Debug,
26 hash::Hash,
27 ops::RangeInclusive,
28 pin::Pin,
29 sync::Arc,
30 task::{ready, Context, Poll},
31};
32use tracing::{debug, trace};
33
34pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<RawBal>>;
36
37#[derive(Debug, Clone)]
39pub struct FullBlockClient<Client>
40where
41 Client: BlockClient,
42{
43 client: Client,
44 consensus: Arc<dyn Consensus<Client::Block>>,
45}
46
47impl<Client> FullBlockClient<Client>
48where
49 Client: BlockClient,
50{
51 pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
53 Self { client, consensus }
54 }
55
56 #[cfg(any(test, feature = "test-utils"))]
58 pub fn test_client(client: Client) -> Self {
59 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
60 }
61}
62
63impl<Client> FullBlockClient<Client>
64where
65 Client: BlockClient,
66{
67 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
74 FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
75 }
76
77 pub fn get_full_block_range(
87 &self,
88 hash: B256,
89 count: u64,
90 ) -> FetchFullBlockRangeFuture<Client> {
91 let client = self.client.clone();
92 FetchFullBlockRangeFuture {
93 start_hash: hash,
94 count,
95 request: FullBlockRangeRequest {
96 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
97 bodies: None,
98 },
99 client,
100 headers: None,
101 pending_headers: VecDeque::new(),
102 bodies: HashMap::default(),
103 consensus: Arc::clone(&self.consensus),
104 }
105 }
106}
107
108impl<Client> FullBlockClient<Client>
109where
110 Client: BlockClient + BlockAccessListsClient,
111{
112 pub fn get_full_block_with_access_lists(
120 &self,
121 hash: B256,
122 ) -> FetchFullBlockWithBalFuture<Client> {
123 self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
124 }
125
126 pub fn get_full_block_with_access_lists_with_requirement(
134 &self,
135 hash: B256,
136 requirement: BalRequirement,
137 ) -> FetchFullBlockWithBalFuture<Client> {
138 let client = self.client.clone();
139 FetchFullBlockWithBalFuture {
140 block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
141 block_result: None,
142 bal_request_state: BalRequestState::Pending(
143 client.get_block_access_lists_with_requirement(vec![hash], requirement),
144 ),
145 }
146 }
147
148 pub fn get_full_block_range_with_optional_access_lists(
155 &self,
156 hash: B256,
157 count: u64,
158 ) -> FetchFullBlockRangeWithBalFuture<Client> {
159 self.get_full_block_range_with_optional_access_lists_with_requirement(
160 hash,
161 count,
162 BalRequirement::default(),
163 )
164 }
165
166 pub fn get_full_block_range_with_optional_access_lists_with_requirement(
173 &self,
174 hash: B256,
175 count: u64,
176 requirement: BalRequirement,
177 ) -> FetchFullBlockRangeWithBalFuture<Client> {
178 let client = self.client.clone();
179 FetchFullBlockRangeWithBalFuture {
180 blocks: self.get_full_block_range(hash, count),
181 client,
182 block_result: None,
183 access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
184 }
185 }
186}
187
188#[must_use = "futures do nothing unless polled"]
193pub struct FetchFullBlockFuture<Client>
194where
195 Client: BlockClient,
196{
197 client: Client,
198 consensus: Arc<dyn Consensus<Client::Block>>,
199 hash: B256,
200 request: FullBlockRequest<Client>,
201 header: Option<SealedHeader<Client::Header>>,
202 body: Option<BodyResponse<Client::Body>>,
203}
204
205impl<Client> FetchFullBlockFuture<Client>
206where
207 Client: BlockClient,
208{
209 fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
210 Self {
211 hash,
212 consensus,
213 request: FullBlockRequest {
214 header: Some(client.get_header(hash.into())),
215 body: Some(client.get_block_body(hash)),
216 },
217 client,
218 header: None,
219 body: None,
220 }
221 }
222
223 pub const fn hash(&self) -> &B256 {
225 &self.hash
226 }
227
228 pub fn block_number(&self) -> Option<u64> {
230 self.header.as_ref().map(|h| h.number())
231 }
232
233 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
235 if self.header.is_none() || self.body.is_none() {
236 return None
237 }
238
239 let header = self.header.take().unwrap();
240 let resp = self.body.take().unwrap();
241 match resp {
242 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
243 BodyResponse::PendingValidation(resp) => {
244 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
246 {
247 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
248 self.client.report_bad_message(resp.peer_id());
249 self.header = Some(header);
250 self.request.body = Some(self.client.get_block_body(self.hash));
251 return None
252 }
253 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
254 }
255 }
256 }
257
258 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
259 if let Some(ref header) = self.header {
260 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
261 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
262 self.client.report_bad_message(resp.peer_id());
263 return
264 }
265 self.body = Some(BodyResponse::Validated(resp.into_data()));
266 return
267 }
268 self.body = Some(BodyResponse::PendingValidation(resp));
269 }
270}
271
272impl<Client> Future for FetchFullBlockFuture<Client>
273where
274 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
275{
276 type Output = SealedBlock<Client::Block>;
277
278 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
279 let this = self.get_mut();
280
281 let mut budget = 4;
283
284 loop {
285 match ready!(this.request.poll(cx)) {
286 ResponseResult::Header(res) => {
287 match res {
288 Ok(maybe_header) => {
289 let (peer, maybe_header) =
290 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
291 if let Some(header) = maybe_header {
292 if header.hash() == this.hash {
293 this.header = Some(header);
294 } else {
295 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
296 this.client.report_bad_message(peer)
298 }
299 }
300 }
301 Err(err) => {
302 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
303 }
304 }
305
306 if this.header.is_none() {
307 this.request.header = Some(this.client.get_header(this.hash.into()));
309 }
310 }
311 ResponseResult::Body(res) => {
312 match res {
313 Ok(maybe_body) => {
314 if let Some(body) = maybe_body.transpose() {
315 this.on_block_response(body);
316 }
317 }
318 Err(err) => {
319 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
320 }
321 }
322 if this.body.is_none() {
323 this.request.body = Some(this.client.get_block_body(this.hash));
325 }
326 }
327 }
328
329 if let Some(res) = this.take_block() {
330 return Poll::Ready(res)
331 }
332
333 budget -= 1;
335 if budget == 0 {
336 cx.waker().wake_by_ref();
338 return Poll::Pending
339 }
340 }
341 }
342}
343
344#[must_use = "futures do nothing unless polled"]
349pub struct FetchFullBlockWithBalFuture<Client>
350where
351 Client: BlockClient + BlockAccessListsClient,
352{
353 block: FetchFullBlockFuture<Client>,
354 block_result: Option<SealedBlock<Client::Block>>,
355 bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
356}
357
358impl<Client> FetchFullBlockWithBalFuture<Client>
359where
360 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
361{
362 pub const fn hash(&self) -> &B256 {
364 self.block.hash()
365 }
366}
367
368impl<Client> FetchFullBlockWithBalFuture<Client>
369where
370 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
371{
372 pub fn block_number(&self) -> Option<u64> {
374 self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
375 }
376
377 fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
383 let res = match &mut self.bal_request_state {
384 BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
385 BalRequestState::Ready(_) => return Poll::Ready(()),
386 };
387
388 match res {
389 Ok(bal) => {
390 let (peer, access_lists) = bal.split();
391 match access_lists.0.len() {
392 0 => self.bal_request_state = BalRequestState::Ready(None),
393 1 => {
394 let bal = access_lists.0.into_iter().next().expect("len checked");
395 self.bal_request_state =
396 BalRequestState::Ready(Some(WithPeerId::new(peer, bal)));
397 }
398 received => {
399 debug!(
400 target: "downloaders",
401 hash = ?self.block.hash(),
402 expected = 1,
403 received,
404 "Received wrong access list response",
405 );
406 self.block.client.report_bad_message(peer);
407 self.bal_request_state = BalRequestState::Ready(None);
408 }
409 }
410 }
411 Err(err) => {
412 debug!(
413 target: "downloaders",
414 %err,
415 hash = ?self.block.hash(),
416 "Access list download failed",
417 );
418 self.bal_request_state = BalRequestState::Ready(None);
419 }
420 }
421
422 Poll::Ready(())
423 }
424
425 fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
431 let BalRequestState::Ready(bal) = &mut self.bal_request_state else { return None };
432 let block = self.block_result.take()?;
433 let raw_bal =
434 bal.take().and_then(|bal| match seal_block_access_list_for_block(&block, bal) {
435 Ok(raw_bal) => raw_bal,
436 Err(peer) => {
437 self.block.client.report_bad_message(peer);
438 None
439 }
440 });
441 Some(SealedBlockWith::new(block, raw_bal))
442 }
443}
444
445impl<Client> Future for FetchFullBlockWithBalFuture<Client>
446where
447 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
448{
449 type Output = SealedBlockWithAccessList<Client::Block>;
450
451 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
452 let this = self.get_mut();
453
454 if this.block_result.is_none() &&
455 let Poll::Ready(block) = this.block.poll_unpin(cx)
456 {
457 this.block_result = Some(block);
458 }
459
460 ready!(this.poll_bal_request(cx));
461
462 if let Some(res) = this.take_block_and_access_lists() {
463 return Poll::Ready(res)
464 }
465
466 Poll::Pending
467 }
468}
469
470impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
471where
472 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
473{
474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475 f.debug_struct("FetchFullBlockWithBalFuture")
476 .field("hash", &self.block.hash())
477 .field("block_ready", &self.block_result.is_some())
478 .field("bal_request_ready", &self.bal_request_state.is_ready())
479 .finish()
480 }
481}
482
483enum BalRequestState<Req> {
485 Pending(Req),
486 Ready(Option<WithPeerId<Option<Bytes>>>),
487}
488
489impl<Req> BalRequestState<Req> {
490 const fn is_ready(&self) -> bool {
491 matches!(self, Self::Ready(_))
492 }
493}
494
495#[must_use = "futures do nothing unless polled"]
502#[expect(missing_debug_implementations)]
503pub struct FetchFullBlockRangeWithBalFuture<Client>
504where
505 Client: BlockClient + BlockAccessListsClient,
506{
507 blocks: FetchFullBlockRangeFuture<Client>,
508 client: Client,
509 block_result: Option<Vec<SealedBlock<Client::Block>>>,
510 access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
511}
512
513impl<Client> FetchFullBlockRangeWithBalFuture<Client>
514where
515 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
516 + BlockAccessListsClient,
517{
518 fn start_access_lists_request_if_possible(&mut self) {
519 let requirement = match &self.access_lists {
520 OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
521 OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
522 return
523 }
524 };
525
526 let Some(blocks) = self.block_result.as_ref() else { return };
528 let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
529 self.access_lists = OptionalBlockAccessListsState::Pending(
530 self.client.get_block_access_lists_with_requirement(hashes, requirement),
531 );
532 }
533
534 fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
536 self.start_access_lists_request_if_possible();
537
538 let poll = match &mut self.access_lists {
539 OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
540 OptionalBlockAccessListsState::WaitingForBlocks { .. } |
541 OptionalBlockAccessListsState::Ready(_) => return,
542 };
543
544 match poll {
545 Poll::Pending => {}
546 Poll::Ready(Ok(access_lists)) => {
547 self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
548 }
549 Poll::Ready(Err(err)) => {
550 debug!(
551 target: "downloaders",
552 %err,
553 start_hash = ?self.blocks.start_hash(),
554 "Access list range download failed",
555 );
556
557 self.access_lists = OptionalBlockAccessListsState::Ready(None);
560 }
561 }
562 }
563
564 fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
566 let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
567 return None
568 };
569
570 let blocks = self.block_result.take()?;
571
572 Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
573 }
574}
575
576impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
577where
578 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
579 + BlockAccessListsClient
580 + 'static,
581{
582 type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
583
584 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
585 let this = self.get_mut();
586
587 if this.block_result.is_none() &&
589 let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
590 {
591 this.block_result = Some(blocks);
592 }
593
594 this.poll_access_lists(cx);
595
596 if let Some(response) = this.take_response() {
597 return Poll::Ready(response)
598 }
599
600 Poll::Pending
601 }
602}
603
604enum OptionalBlockAccessListsState<Req> {
606 WaitingForBlocks {
608 requirement: BalRequirement,
610 },
611 Pending(Req),
613 Ready(Option<WithPeerId<BlockAccessLists>>),
615}
616
617impl<Client> Debug for FetchFullBlockFuture<Client>
618where
619 Client: BlockClient<Header: Debug, Body: Debug>,
620{
621 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
622 f.debug_struct("FetchFullBlockFuture")
623 .field("hash", &self.hash)
624 .field("header", &self.header)
625 .field("body", &self.body)
626 .finish()
627 }
628}
629
630struct FullBlockRequest<Client>
631where
632 Client: BlockClient,
633{
634 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
635 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
636}
637
638impl<Client> FullBlockRequest<Client>
639where
640 Client: BlockClient,
641{
642 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
643 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
644 let Poll::Ready(res) = fut.poll(cx)
645 {
646 self.header = None;
647 return Poll::Ready(ResponseResult::Header(res))
648 }
649
650 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
651 let Poll::Ready(res) = fut.poll(cx)
652 {
653 self.body = None;
654 return Poll::Ready(ResponseResult::Body(res))
655 }
656
657 Poll::Pending
658 }
659}
660
661enum ResponseResult<H, B> {
664 Header(PeerRequestResult<Option<H>>),
665 Body(PeerRequestResult<Option<B>>),
666}
667
668#[derive(Debug)]
670enum BodyResponse<B> {
671 Validated(B),
673 PendingValidation(WithPeerId<B>),
675}
676#[must_use = "futures do nothing unless polled"]
689#[expect(missing_debug_implementations)]
690pub struct FetchFullBlockRangeFuture<Client>
691where
692 Client: BlockClient,
693{
694 client: Client,
696 consensus: Arc<dyn Consensus<Client::Block>>,
698 start_hash: B256,
700 count: u64,
702 request: FullBlockRangeRequest<Client>,
704 headers: Option<Vec<SealedHeader<Client::Header>>>,
706 pending_headers: VecDeque<SealedHeader<Client::Header>>,
708 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
710}
711
712impl<Client> FetchFullBlockRangeFuture<Client>
713where
714 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
715{
716 fn is_bodies_complete(&self) -> bool {
718 self.bodies.len() == self.count as usize
719 }
720
721 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
725 if let Some(header) = self.pending_headers.pop_front() {
726 self.bodies.insert(header, body_response);
727 }
728 }
729
730 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
732 for body in bodies {
733 self.insert_body(body);
734 }
735 }
736
737 fn remaining_bodies_hashes(&self) -> Vec<B256> {
740 self.pending_headers.iter().map(|h| h.hash()).collect()
741 }
742
743 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
751 if !self.is_bodies_complete() {
752 return None
754 }
755
756 let headers = self.headers.take()?;
757 let mut needs_retry = false;
758 let mut valid_responses = Vec::new();
759
760 for header in &headers {
761 if let Some(body_resp) = self.bodies.remove(header) {
762 let body = match body_resp {
764 BodyResponse::Validated(body) => body,
765 BodyResponse::PendingValidation(resp) => {
766 if let Err(err) =
768 self.consensus.validate_body_against_header(resp.data(), header)
769 {
770 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
771 self.client.report_bad_message(resp.peer_id());
772
773 self.pending_headers.push_back(header.clone());
775 needs_retry = true;
776 continue
777 }
778
779 resp.into_data()
780 }
781 };
782
783 valid_responses
784 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
785 }
786 }
787
788 if needs_retry {
789 for block in valid_responses {
792 let (header, body) = block.split_sealed_header_body();
793 self.bodies.insert(header, BodyResponse::Validated(body));
794 }
795
796 self.headers = Some(headers);
798
799 let hashes = self.remaining_bodies_hashes();
801 self.request.bodies = Some(self.client.get_block_bodies(hashes));
802 return None
803 }
804
805 Some(valid_responses)
806 }
807
808 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
809 let (peer, mut headers_falling) =
810 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
811
812 if headers_falling.len() == self.count as usize {
814 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
816
817 if headers_falling[0].hash() == self.start_hash {
819 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
820 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
822 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
823 self.client.report_bad_message(peer);
824 }
825
826 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
828
829 self.pending_headers = headers_falling.clone().into();
831
832 if !self.has_bodies_request_started() {
834 self.request.bodies = Some(self.client.get_block_bodies(hashes));
836 }
837
838 self.headers = Some(headers_falling);
840 } else {
841 self.client.report_bad_message(peer);
843 }
844 }
845 }
846
847 const fn has_bodies_request_started(&self) -> bool {
850 self.request.bodies.is_some()
851 }
852
853 pub const fn start_hash(&self) -> B256 {
855 self.start_hash
856 }
857
858 pub const fn count(&self) -> u64 {
860 self.count
861 }
862}
863
864impl<Client> Future for FetchFullBlockRangeFuture<Client>
865where
866 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
867{
868 type Output = Vec<SealedBlock<Client::Block>>;
869
870 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
871 let this = self.get_mut();
872
873 loop {
874 match ready!(this.request.poll(cx)) {
875 RangeResponseResult::Header(res) => {
884 match res {
885 Ok(headers) => {
886 this.on_headers_response(headers);
887 }
888 Err(err) => {
889 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
890 }
891 }
892
893 if this.headers.is_none() {
894 this.request.headers = Some(this.client.get_headers(HeadersRequest {
896 start: this.start_hash.into(),
897 limit: this.count,
898 direction: HeadersDirection::Falling,
899 }));
900 }
901 }
902 RangeResponseResult::Body(res) => {
908 match res {
909 Ok(bodies_resp) => {
910 let (peer, new_bodies) = bodies_resp.split();
911
912 this.insert_bodies(
914 new_bodies
915 .into_iter()
916 .map(|resp| WithPeerId::new(peer, resp))
917 .map(BodyResponse::PendingValidation),
918 );
919
920 if !this.is_bodies_complete() {
921 let req_hashes = this.remaining_bodies_hashes();
923
924 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
926 }
927 }
928 Err(err) => {
929 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
930 }
931 }
932 if this.request.bodies.is_none() && !this.is_bodies_complete() {
933 let hashes = this.remaining_bodies_hashes();
946 if !hashes.is_empty() {
947 this.request.bodies = Some(this.client.get_block_bodies(hashes));
948 }
949 }
950 }
951 }
952
953 if let Some(res) = this.take_blocks() {
954 return Poll::Ready(res)
955 }
956 }
957 }
958}
959
960struct FullBlockRangeRequest<Client>
964where
965 Client: BlockClient,
966{
967 headers: Option<<Client as HeadersClient>::Output>,
968 bodies: Option<<Client as BodiesClient>::Output>,
969}
970
971impl<Client> FullBlockRangeRequest<Client>
972where
973 Client: BlockClient,
974{
975 fn poll(
976 &mut self,
977 cx: &mut Context<'_>,
978 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
979 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
980 let Poll::Ready(res) = fut.poll(cx)
981 {
982 self.headers = None;
983 return Poll::Ready(RangeResponseResult::Header(res))
984 }
985
986 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
987 let Poll::Ready(res) = fut.poll(cx)
988 {
989 self.bodies = None;
990 return Poll::Ready(RangeResponseResult::Body(res))
991 }
992
993 Poll::Pending
994 }
995}
996
997enum RangeResponseResult<H, B> {
1000 Header(PeerRequestResult<Vec<H>>),
1001 Body(PeerRequestResult<Vec<B>>),
1002}
1003
1004#[derive(Debug, Clone)]
1006#[non_exhaustive]
1007pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1008
1009impl<Net> DownloadClient for NoopFullBlockClient<Net>
1011where
1012 Net: Debug + Send + Sync,
1013{
1014 fn report_bad_message(&self, _peer_id: PeerId) {}
1021
1022 fn num_connected_peers(&self) -> usize {
1028 0
1029 }
1030}
1031
1032impl<Net> BodiesClient for NoopFullBlockClient<Net>
1034where
1035 Net: NetworkPrimitives,
1036{
1037 type Body = Net::BlockBody;
1038 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1040
1041 fn get_block_bodies_with_priority_and_range_hint(
1052 &self,
1053 _hashes: Vec<B256>,
1054 _priority: Priority,
1055 _range_hint: Option<RangeInclusive<u64>>,
1056 ) -> Self::Output {
1057 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1060 }
1061}
1062
1063impl<Net> HeadersClient for NoopFullBlockClient<Net>
1064where
1065 Net: NetworkPrimitives,
1066{
1067 type Header = Net::BlockHeader;
1068 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1071
1072 fn get_headers_with_priority(
1086 &self,
1087 _request: HeadersRequest,
1088 _priority: Priority,
1089 ) -> Self::Output {
1090 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1091 }
1092}
1093
1094impl<Net> BlockClient for NoopFullBlockClient<Net>
1095where
1096 Net: NetworkPrimitives,
1097{
1098 type Block = Net::Block;
1099}
1100
1101impl<Net> Default for NoopFullBlockClient<Net> {
1102 fn default() -> Self {
1103 Self(PhantomData::<Net>)
1104 }
1105}
1106
1107fn seal_block_access_list_for_block<B: Block>(
1113 block: &SealedBlock<B>,
1114 bal: WithPeerId<Option<Bytes>>,
1115) -> Result<Option<RawBal>, PeerId> {
1116 let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1117
1118 let (peer, bal) = bal.split();
1119 let Some(bal) = bal else { return Ok(None) };
1120 let raw_bal = RawBal::new(bal);
1121 let computed = raw_bal.hash();
1122 if computed == expected {
1123 return Ok(Some(raw_bal))
1124 }
1125
1126 debug!(
1127 target: "downloaders",
1128 block_hash = ?block.hash(),
1129 ?computed,
1130 ?expected,
1131 "Received block access list with wrong hash",
1132 );
1133 Err(peer)
1134}
1135
1136fn seal_blocks_with_access_lists<Client>(
1143 client: &Client,
1144 blocks: Vec<SealedBlock<Client::Block>>,
1145 access_lists: Option<WithPeerId<BlockAccessLists>>,
1146) -> Vec<SealedBlockWithAccessList<Client::Block>>
1147where
1148 Client: BlockClient,
1149{
1150 let Some(access_lists) = access_lists else {
1151 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1152 };
1153
1154 let (peer, access_lists) = access_lists.split();
1155 let expected = blocks.len();
1156 let received = access_lists.0.len();
1157
1158 if received > expected {
1159 trace!(
1160 target: "downloaders",
1161 expected,
1162 received,
1163 "Ignoring overlong access list range response",
1164 );
1165 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1166 }
1167
1168 let mut access_lists = access_lists.0.into_iter();
1169 let mut blocks = blocks.into_iter();
1170 let mut response = Vec::with_capacity(expected);
1171
1172 for block in blocks.by_ref() {
1173 let Some(bal) = access_lists.next() else {
1174 response.push(SealedBlockWith::from_block(block));
1177 break
1178 };
1179
1180 match seal_block_access_list_for_block(&block, WithPeerId::new(peer, bal)) {
1181 Ok(raw_bal) => response.push(SealedBlockWith::new(block, raw_bal)),
1182 Err(peer) => {
1183 client.report_bad_message(peer);
1186 response.push(SealedBlockWith::from_block(block));
1187 break
1188 }
1189 }
1190 }
1191
1192 response.extend(blocks.map(SealedBlockWith::from_block));
1193 response
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198 use reth_ethereum_primitives::BlockBody;
1199
1200 use super::*;
1201 use crate::{error::RequestError, test_utils::TestFullBlockClient};
1202 use alloy_consensus::Header;
1203 use alloy_primitives::{keccak256, map::B256Map, Bytes};
1204 use parking_lot::Mutex;
1205 use std::{
1206 ops::Range,
1207 sync::{
1208 atomic::{AtomicBool, AtomicUsize, Ordering},
1209 Arc,
1210 },
1211 };
1212
1213 const EMPTY_LIST_CODE: u8 = 0xc0;
1214 use tokio::time::{timeout, Duration};
1215
1216 fn sealed_header_with_access_list_hash(bal: &Bytes) -> SealedHeader {
1217 let header =
1218 Header { block_access_list_hash: Some(keccak256(bal.as_ref())), ..Default::default() };
1219 SealedHeader::seal_slow(header)
1220 }
1221
1222 fn range_access_lists<B: Block>(
1223 blocks: &[SealedBlockWithAccessList<B>],
1224 ) -> Vec<Option<RawBal>> {
1225 blocks.iter().map(|block| block.data().clone()).collect()
1226 }
1227
1228 #[tokio::test]
1229 async fn download_single_full_block() {
1230 let client = TestFullBlockClient::default();
1231 let header: SealedHeader = SealedHeader::default();
1232 let body = BlockBody::default();
1233 client.insert(header.clone(), body.clone());
1234 let client = FullBlockClient::test_client(client);
1235
1236 let received = client.get_full_block(header.hash()).await;
1237 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1238 }
1239
1240 #[tokio::test]
1241 async fn download_single_full_block_range() {
1242 let client = TestFullBlockClient::default();
1243 let header: SealedHeader = SealedHeader::default();
1244 let body = BlockBody::default();
1245 client.insert(header.clone(), body.clone());
1246 let client = FullBlockClient::test_client(client);
1247
1248 let received = client.get_full_block_range(header.hash(), 1).await;
1249 let received = received.first().expect("response should include a block");
1250 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1251 }
1252
1253 #[tokio::test]
1254 async fn download_single_full_block_with_access_lists() {
1255 let client = FullBlockWithAccessListsClient::default();
1256 let body = BlockBody::default();
1257 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1258 let header = sealed_header_with_access_list_hash(&bal);
1259 client.insert(header.clone(), body.clone(), bal.clone());
1260
1261 let request_count = Arc::clone(&client.access_list_requests);
1262 let client = FullBlockClient::test_client(client);
1263
1264 let received = client.get_full_block_with_access_lists(header.hash()).await;
1265 let expected_raw_bal = RawBal::from(bal);
1266
1267 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1268 assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1269 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1270 }
1271
1272 #[tokio::test]
1273 async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1274 let client = FullBlockWithAccessListsClient::default();
1275 let body = BlockBody::default();
1276 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1277 let header = sealed_header_with_access_list_hash(&bal);
1278 client.insert(header.clone(), body.clone(), bal.clone());
1279
1280 let requirement = Arc::clone(&client.last_access_list_requirement);
1281 let client = FullBlockClient::test_client(client);
1282
1283 let received = client
1284 .get_full_block_with_access_lists_with_requirement(
1285 header.hash(),
1286 BalRequirement::Mandatory,
1287 )
1288 .await;
1289
1290 let expected_raw_bal = RawBal::from(bal);
1291 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1292 assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1293 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1294 }
1295
1296 #[tokio::test]
1297 async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1298 let client = FullBlockWithAccessListsClient::default();
1299 client.set_access_list_pending_polls(1);
1300
1301 let body = BlockBody::default();
1302 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1303 let header = sealed_header_with_access_list_hash(&bal);
1304 client.insert(header.clone(), body.clone(), bal.clone());
1305
1306 let request_count = Arc::clone(&client.access_list_requests);
1307 let client = FullBlockClient::test_client(client);
1308
1309 let received =
1310 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1311 .await
1312 .expect("access list request should complete");
1313
1314 let expected_raw_bal = RawBal::from(bal);
1315 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1316 assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1317 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1318 }
1319
1320 #[tokio::test]
1321 async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1322 let client = FullBlockWithAccessListsClient::default();
1323 let body = BlockBody::default();
1324 let expected_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1325 let wrong_bal = Bytes::from_static(&[0xc1, 0x01]);
1326 let header = sealed_header_with_access_list_hash(&expected_bal);
1327 client.insert(header.clone(), body.clone(), wrong_bal);
1328
1329 let bad_messages = Arc::clone(&client.bad_messages);
1330 let client = FullBlockClient::test_client(client);
1331
1332 let received =
1333 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1334 .await
1335 .expect("block request should complete without access lists");
1336
1337 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1338 assert!(received.data().is_none());
1339 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1340 }
1341
1342 #[tokio::test]
1343 async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1344 let client = FullBlockWithAccessListsClient::default();
1345 let body = BlockBody::default();
1346 let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1347 let header = sealed_header_with_access_list_hash(&expected_bal);
1348 client.inner.insert(header.clone(), body.clone());
1349
1350 let bad_messages = Arc::clone(&client.bad_messages);
1351 let client = FullBlockClient::test_client(client);
1352
1353 let received =
1354 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1355 .await
1356 .expect("block request should complete without access lists");
1357
1358 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1359 assert!(received.data().is_none());
1360 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1361 }
1362
1363 #[tokio::test]
1364 async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1365 let client = FullBlockWithAccessListsClient::default();
1366 let body = BlockBody::default();
1367 let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1368 let wrong_empty_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1369 let header = sealed_header_with_access_list_hash(&expected_bal);
1370 client.insert(header.clone(), body.clone(), wrong_empty_bal);
1371
1372 let bad_messages = Arc::clone(&client.bad_messages);
1373 let client = FullBlockClient::test_client(client);
1374
1375 let received =
1376 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1377 .await
1378 .expect("block request should complete without access lists");
1379
1380 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1381 assert!(received.data().is_none());
1382 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1383 }
1384
1385 #[tokio::test]
1386 async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1387 let client = FullBlockWithAccessListsClient::default();
1388 client.empty_first_response.store(true, Ordering::SeqCst);
1389
1390 let body = BlockBody::default();
1391 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1392 let header = sealed_header_with_access_list_hash(&bal);
1393 client.insert(header.clone(), body.clone(), bal.clone());
1394
1395 let request_count = Arc::clone(&client.access_list_requests);
1396 let bad_messages = Arc::clone(&client.bad_messages);
1397 let client = FullBlockClient::test_client(client);
1398
1399 let received =
1400 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1401 .await
1402 .expect("block request should complete without access lists");
1403
1404 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1405 assert!(received.data().is_none());
1406 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1407 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1408 }
1409
1410 #[tokio::test]
1411 async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1412 let client = FullBlockWithAccessListsClient::default();
1413 client.set_access_lists_unsupported(true);
1414
1415 let body = BlockBody::default();
1416 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1417 let header = sealed_header_with_access_list_hash(&bal);
1418 client.insert(header.clone(), body.clone(), bal);
1419
1420 let request_count = Arc::clone(&client.access_list_requests);
1421 let requirement = Arc::clone(&client.last_access_list_requirement);
1422 let client = FullBlockClient::test_client(client);
1423
1424 let received =
1425 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1426 .await
1427 .expect("block request should complete without access lists");
1428
1429 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1430 assert!(received.data().is_none());
1431 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1432 assert_eq!(
1433 *requirement.lock(),
1434 Some(BalRequirement::Optional),
1435 "single block BAL lookup should be best-effort"
1436 );
1437 }
1438
1439 fn insert_headers_into_client(
1441 client: &TestFullBlockClient,
1442 range: Range<usize>,
1443 ) -> (SealedHeader, BlockBody) {
1444 let mut sealed_header: SealedHeader = SealedHeader::default();
1445 let body = BlockBody::default();
1446 for _ in range {
1447 let (mut header, hash) = sealed_header.split();
1448 header.parent_hash = hash;
1450 header.number += 1;
1451
1452 sealed_header = SealedHeader::seal_slow(header);
1453
1454 client.insert(sealed_header.clone(), body.clone());
1455 }
1456
1457 (sealed_header, body)
1458 }
1459
1460 #[derive(Clone, Debug)]
1461 struct FullBlockWithAccessListsClient {
1462 inner: TestFullBlockClient,
1463 access_lists: Arc<Mutex<B256Map<Bytes>>>,
1464 access_list_requests: Arc<AtomicUsize>,
1465 access_list_soft_limit: Arc<AtomicUsize>,
1466 access_list_pending_polls: Arc<AtomicUsize>,
1467 extra_access_list_entries: Arc<AtomicUsize>,
1468 unsupported_access_lists: Arc<AtomicBool>,
1469 last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1470 bad_messages: Arc<AtomicUsize>,
1471 empty_first_response: Arc<AtomicBool>,
1472 }
1473
1474 impl Default for FullBlockWithAccessListsClient {
1475 fn default() -> Self {
1476 Self {
1477 inner: TestFullBlockClient::default(),
1478 access_lists: Arc::new(Mutex::new(B256Map::default())),
1479 access_list_requests: Arc::new(AtomicUsize::new(0)),
1480 access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1481 access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1482 extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1483 unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1484 last_access_list_requirement: Arc::new(Mutex::new(None)),
1485 bad_messages: Arc::new(AtomicUsize::new(0)),
1486 empty_first_response: Arc::new(AtomicBool::new(false)),
1487 }
1488 }
1489 }
1490
1491 impl FullBlockWithAccessListsClient {
1492 fn insert(&self, header: SealedHeader, body: BlockBody, bal: Bytes) {
1493 self.inner.insert(header.clone(), body);
1494 self.access_lists.lock().insert(header.hash(), bal);
1495 }
1496
1497 fn set_access_list_soft_limit(&self, limit: usize) {
1498 self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1499 }
1500
1501 fn set_access_list_pending_polls(&self, polls: usize) {
1502 self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1503 }
1504
1505 fn set_extra_access_list_entries(&self, count: usize) {
1506 self.extra_access_list_entries.store(count, Ordering::SeqCst);
1507 }
1508
1509 fn set_access_lists_unsupported(&self, unsupported: bool) {
1510 self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1511 }
1512 }
1513
1514 fn insert_headers_with_access_lists_into_client(
1516 client: &FullBlockWithAccessListsClient,
1517 range: Range<usize>,
1518 ) -> (SealedHeader, BlockBody) {
1519 let mut sealed_header: SealedHeader = SealedHeader::default();
1520 let body = BlockBody::default();
1521 for block_idx in range {
1522 let (mut header, hash) = sealed_header.split();
1523 header.parent_hash = hash;
1524 header.number += 1;
1525 let bal = Bytes::from(vec![0xc1, block_idx as u8]);
1526 header.block_access_list_hash = Some(keccak256(bal.as_ref()));
1527
1528 sealed_header = SealedHeader::seal_slow(header);
1529
1530 client.insert(sealed_header.clone(), body.clone(), bal);
1531 }
1532
1533 (sealed_header, body)
1534 }
1535
1536 impl DownloadClient for FullBlockWithAccessListsClient {
1537 fn report_bad_message(&self, peer_id: PeerId) {
1538 self.bad_messages.fetch_add(1, Ordering::SeqCst);
1539 self.inner.report_bad_message(peer_id);
1540 }
1541
1542 fn num_connected_peers(&self) -> usize {
1543 self.inner.num_connected_peers()
1544 }
1545 }
1546
1547 impl HeadersClient for FullBlockWithAccessListsClient {
1548 type Header = <TestFullBlockClient as HeadersClient>::Header;
1549 type Output = <TestFullBlockClient as HeadersClient>::Output;
1550
1551 fn get_headers_with_priority(
1552 &self,
1553 request: HeadersRequest,
1554 priority: Priority,
1555 ) -> Self::Output {
1556 self.inner.get_headers_with_priority(request, priority)
1557 }
1558 }
1559
1560 impl BodiesClient for FullBlockWithAccessListsClient {
1561 type Body = <TestFullBlockClient as BodiesClient>::Body;
1562 type Output = <TestFullBlockClient as BodiesClient>::Output;
1563
1564 fn get_block_bodies_with_priority_and_range_hint(
1565 &self,
1566 hashes: Vec<B256>,
1567 priority: Priority,
1568 range_hint: Option<RangeInclusive<u64>>,
1569 ) -> Self::Output {
1570 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1571 }
1572 }
1573
1574 struct MaybePendingAccessLists {
1575 response: Option<PeerRequestResult<BlockAccessLists>>,
1576 pending_polls: usize,
1577 }
1578
1579 impl MaybePendingAccessLists {
1580 const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1581 Self { response: Some(response), pending_polls }
1582 }
1583 }
1584
1585 impl std::future::Future for MaybePendingAccessLists {
1586 type Output = PeerRequestResult<BlockAccessLists>;
1587
1588 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1589 if self.pending_polls > 0 {
1590 self.pending_polls -= 1;
1591 cx.waker().wake_by_ref();
1592 return Poll::Pending
1593 }
1594
1595 Poll::Ready(self.response.take().expect("future polled after completion"))
1596 }
1597 }
1598
1599 impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1600 type Output = MaybePendingAccessLists;
1601
1602 fn get_block_access_lists_with_priority_and_requirement(
1603 &self,
1604 hashes: Vec<B256>,
1605 _priority: Priority,
1606 requirement: BalRequirement,
1607 ) -> Self::Output {
1608 self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1609 *self.last_access_list_requirement.lock() = Some(requirement);
1610 let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1611
1612 if self.unsupported_access_lists.load(Ordering::SeqCst) {
1613 return MaybePendingAccessLists::new(
1614 Err(RequestError::UnsupportedCapability),
1615 pending_polls,
1616 )
1617 }
1618
1619 if self.empty_first_response.swap(false, Ordering::SeqCst) {
1620 return MaybePendingAccessLists::new(
1621 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1622 pending_polls,
1623 )
1624 }
1625
1626 let mut access_lists: Vec<_> = hashes
1627 .into_iter()
1628 .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1629 .map(|hash| self.access_lists.lock().get(&hash).cloned())
1630 .collect();
1631 for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1632 access_lists.push(None);
1633 }
1634
1635 MaybePendingAccessLists::new(
1636 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1637 pending_polls,
1638 )
1639 }
1640 }
1641
1642 impl BlockClient for FullBlockWithAccessListsClient {
1643 type Block = reth_ethereum_primitives::Block;
1644 }
1645
1646 #[derive(Clone, Debug)]
1647 struct FailingBodiesClient {
1648 inner: TestFullBlockClient,
1649 fail_on: usize,
1650 body_requests: Arc<AtomicUsize>,
1651 }
1652
1653 impl FailingBodiesClient {
1654 fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1655 Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1656 }
1657 }
1658
1659 impl DownloadClient for FailingBodiesClient {
1660 fn report_bad_message(&self, peer_id: PeerId) {
1661 self.inner.report_bad_message(peer_id);
1662 }
1663
1664 fn num_connected_peers(&self) -> usize {
1665 self.inner.num_connected_peers()
1666 }
1667 }
1668
1669 impl HeadersClient for FailingBodiesClient {
1670 type Header = <TestFullBlockClient as HeadersClient>::Header;
1671 type Output = <TestFullBlockClient as HeadersClient>::Output;
1672
1673 fn get_headers_with_priority(
1674 &self,
1675 request: HeadersRequest,
1676 priority: Priority,
1677 ) -> Self::Output {
1678 self.inner.get_headers_with_priority(request, priority)
1679 }
1680 }
1681
1682 impl BodiesClient for FailingBodiesClient {
1683 type Body = <TestFullBlockClient as BodiesClient>::Body;
1684 type Output = <TestFullBlockClient as BodiesClient>::Output;
1685
1686 fn get_block_bodies_with_priority_and_range_hint(
1687 &self,
1688 hashes: Vec<B256>,
1689 priority: Priority,
1690 range_hint: Option<RangeInclusive<u64>>,
1691 ) -> Self::Output {
1692 let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1693 if attempt == self.fail_on {
1694 return futures::future::ready(Err(RequestError::Timeout))
1695 }
1696
1697 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1698 }
1699 }
1700
1701 impl BlockClient for FailingBodiesClient {
1702 type Block = reth_ethereum_primitives::Block;
1703 }
1704
1705 #[tokio::test]
1706 async fn download_full_block_range() {
1707 let client = TestFullBlockClient::default();
1708 let (header, body) = insert_headers_into_client(&client, 0..50);
1709 let client = FullBlockClient::test_client(client);
1710
1711 let received = client.get_full_block_range(header.hash(), 1).await;
1712 let received = received.first().expect("response should include a block");
1713 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1714
1715 let received = client.get_full_block_range(header.hash(), 10).await;
1716 assert_eq!(received.len(), 10);
1717 for (i, block) in received.iter().enumerate() {
1718 let expected_number = header.number - i as u64;
1719 assert_eq!(block.number, expected_number);
1720 }
1721 }
1722
1723 #[tokio::test]
1724 async fn download_full_block_range_over_soft_limit() {
1725 let client = TestFullBlockClient::default();
1727 let (header, body) = insert_headers_into_client(&client, 0..50);
1728 let client = FullBlockClient::test_client(client);
1729
1730 let received = client.get_full_block_range(header.hash(), 1).await;
1731 let received = received.first().expect("response should include a block");
1732 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1733
1734 let received = client.get_full_block_range(header.hash(), 50).await;
1735 assert_eq!(received.len(), 50);
1736 for (i, block) in received.iter().enumerate() {
1737 let expected_number = header.number - i as u64;
1738 assert_eq!(block.number, expected_number);
1739 }
1740 }
1741
1742 #[tokio::test]
1743 async fn download_full_block_range_retries_after_body_error() {
1744 let mut client = TestFullBlockClient::default();
1745 client.set_soft_limit(2);
1746 let (header, _) = insert_headers_into_client(&client, 0..3);
1747
1748 let client = FailingBodiesClient::new(client, 1);
1749 let body_requests = Arc::clone(&client.body_requests);
1750 let client = FullBlockClient::test_client(client);
1751
1752 let received =
1753 timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1754 .await
1755 .expect("body request retry should complete");
1756
1757 assert_eq!(received.len(), 3);
1758 assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1759 }
1760
1761 #[tokio::test]
1762 async fn download_full_block_range_with_access_lists() {
1763 let client = FullBlockWithAccessListsClient::default();
1764 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1765
1766 let access_lists = Arc::clone(&client.access_lists);
1767 let request_count = Arc::clone(&client.access_list_requests);
1768 let requirement = Arc::clone(&client.last_access_list_requirement);
1769 let client = FullBlockClient::test_client(client);
1770
1771 let response = timeout(
1772 Duration::from_secs(1),
1773 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1774 )
1775 .await
1776 .expect("range request should complete");
1777
1778 let blocks = response;
1779 assert_eq!(blocks.len(), 3);
1780 let expected = {
1781 let bals = access_lists.lock();
1782 blocks
1783 .iter()
1784 .map(|block| {
1785 let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1786 Some(RawBal::from(bal))
1787 })
1788 .collect::<Vec<_>>()
1789 };
1790 assert_eq!(range_access_lists(&blocks), expected);
1791 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1792 assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1793 }
1794
1795 #[tokio::test]
1796 async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1797 let client = FullBlockWithAccessListsClient::default();
1798 client.empty_first_response.store(true, Ordering::SeqCst);
1799 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1800
1801 let request_count = Arc::clone(&client.access_list_requests);
1802 let client = FullBlockClient::test_client(client);
1803
1804 let response = timeout(
1805 Duration::from_secs(1),
1806 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1807 )
1808 .await
1809 .expect("range request should complete without access lists");
1810
1811 let blocks = response;
1812 assert_eq!(blocks.len(), 3);
1813 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1814 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1815 }
1816
1817 #[tokio::test]
1818 async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1819 let client = FullBlockWithAccessListsClient::default();
1820 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1821
1822 let requirement = Arc::clone(&client.last_access_list_requirement);
1823 let client = FullBlockClient::test_client(client);
1824
1825 let blocks = timeout(
1826 Duration::from_secs(1),
1827 client.get_full_block_range_with_optional_access_lists_with_requirement(
1828 header.hash(),
1829 3,
1830 BalRequirement::Mandatory,
1831 ),
1832 )
1833 .await
1834 .expect("range request should complete");
1835
1836 assert_eq!(blocks.len(), 3);
1837 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1838 }
1839
1840 #[tokio::test]
1841 async fn download_full_block_range_with_access_lists_preserves_short_response() {
1842 let client = FullBlockWithAccessListsClient::default();
1843 client.set_access_list_soft_limit(2);
1844 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1845
1846 let access_lists = Arc::clone(&client.access_lists);
1847 let request_count = Arc::clone(&client.access_list_requests);
1848 let client = FullBlockClient::test_client(client);
1849
1850 let blocks = timeout(
1851 Duration::from_secs(1),
1852 client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1853 )
1854 .await
1855 .expect("range request should complete without access lists");
1856
1857 assert_eq!(blocks.len(), 5);
1858 let expected = {
1859 let bals = access_lists.lock();
1860 blocks
1861 .iter()
1862 .enumerate()
1863 .map(|(idx, block)| {
1864 if idx >= 2 {
1865 return None
1866 }
1867
1868 let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1869 Some(RawBal::from(bal))
1870 })
1871 .collect::<Vec<_>>()
1872 };
1873 assert_eq!(range_access_lists(&blocks), expected);
1874 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1875 }
1876
1877 #[tokio::test]
1878 async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1879 let client = FullBlockWithAccessListsClient::default();
1880 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1881 client.access_lists.lock().remove(&header.hash());
1882
1883 let access_lists = Arc::clone(&client.access_lists);
1884 let bad_messages = Arc::clone(&client.bad_messages);
1885 let client = FullBlockClient::test_client(client);
1886
1887 let blocks = timeout(
1888 Duration::from_secs(1),
1889 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1890 )
1891 .await
1892 .expect("range request should complete");
1893
1894 assert_eq!(blocks.len(), 3);
1895 let expected = {
1896 let bals = access_lists.lock();
1897 blocks
1898 .iter()
1899 .map(|block| {
1900 if block.block().hash() == header.hash() {
1901 return None
1902 }
1903
1904 let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1905 Some(RawBal::from(bal))
1906 })
1907 .collect::<Vec<_>>()
1908 };
1909 assert_eq!(range_access_lists(&blocks), expected);
1910 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1911 }
1912
1913 #[tokio::test]
1914 async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1915 let client = FullBlockWithAccessListsClient::default();
1916 client.set_access_lists_unsupported(true);
1917 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1918
1919 let request_count = Arc::clone(&client.access_list_requests);
1920 let client = FullBlockClient::test_client(client);
1921
1922 let blocks = timeout(
1923 Duration::from_secs(1),
1924 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1925 )
1926 .await
1927 .expect("range request should complete without access lists");
1928
1929 assert_eq!(blocks.len(), 3);
1930 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1931 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1932 }
1933
1934 #[tokio::test]
1935 async fn download_full_block_range_with_access_lists_ignores_long_response() {
1936 let client = FullBlockWithAccessListsClient::default();
1937 client.set_extra_access_list_entries(1);
1938 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1939
1940 let request_count = Arc::clone(&client.access_list_requests);
1941 let bad_messages = Arc::clone(&client.bad_messages);
1942 let client = FullBlockClient::test_client(client);
1943
1944 let blocks = timeout(
1945 Duration::from_secs(1),
1946 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1947 )
1948 .await
1949 .expect("range request should complete without access lists");
1950
1951 assert_eq!(blocks.len(), 3);
1952 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1953 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1954 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1955 }
1956
1957 #[tokio::test]
1958 async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
1959 let client = FullBlockWithAccessListsClient::default();
1960 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1961 client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
1962
1963 let bad_messages = Arc::clone(&client.bad_messages);
1964 let client = FullBlockClient::test_client(client);
1965
1966 let blocks = timeout(
1967 Duration::from_secs(1),
1968 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1969 )
1970 .await
1971 .expect("range request should complete without access lists");
1972
1973 assert_eq!(blocks.len(), 3);
1974 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1975 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1976 }
1977
1978 #[tokio::test]
1979 async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
1980 let client = FullBlockWithAccessListsClient::default();
1981 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1982 let first_bal =
1983 client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
1984 let second_hash = header.parent_hash;
1985 client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
1986
1987 let bad_messages = Arc::clone(&client.bad_messages);
1988 let client = FullBlockClient::test_client(client);
1989
1990 let blocks = timeout(
1991 Duration::from_secs(1),
1992 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1993 )
1994 .await
1995 .expect("range request should complete without unvalidated access lists");
1996
1997 assert_eq!(blocks.len(), 3);
1998 assert_eq!(blocks[1].block().hash(), second_hash);
1999 assert_eq!(range_access_lists(&blocks), vec![Some(RawBal::from(first_bal)), None, None]);
2000 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2001 }
2002
2003 #[tokio::test]
2004 async fn download_full_block_range_with_invalid_header() {
2005 let client = TestFullBlockClient::default();
2006 let range_length: usize = 3;
2007 let (header, _) = insert_headers_into_client(&client, 0..range_length);
2008
2009 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2010 test_consensus.set_fail_validation(true);
2011 test_consensus.set_fail_body_against_header(false);
2012 let client = FullBlockClient::new(client, Arc::new(test_consensus));
2013
2014 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2015
2016 assert_eq!(received.len(), range_length);
2017 for (i, block) in received.iter().enumerate() {
2018 let expected_number = header.number - i as u64;
2019 assert_eq!(block.number, expected_number);
2020 }
2021 }
2022}