1use super::headers::client::HeadersRequest;
2use crate::{
3 block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4 bodies::client::{BodiesClient, SingleBodyRequest},
5 download::DownloadClient,
6 error::PeerRequestResult,
7 headers::client::{HeadersClient, SingleHeaderRequest},
8 priority::Priority,
9 BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_eip7928::bal::RawBal;
13use alloy_primitives::{Bytes, Sealable, B256};
14use core::marker::PhantomData;
15use futures::FutureExt;
16use reth_consensus::Consensus;
17use reth_eth_wire_types::{
18 BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
19};
20use reth_network_peers::{PeerId, WithPeerId};
21use reth_primitives_traits::{Block, SealedBlock, SealedBlockWith, SealedHeader};
22use std::{
23 cmp::Reverse,
24 collections::{HashMap, VecDeque},
25 fmt::Debug,
26 hash::Hash,
27 ops::RangeInclusive,
28 pin::Pin,
29 sync::Arc,
30 task::{ready, Context, Poll},
31};
32use tracing::{debug, trace};
33
34pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<RawBal>>;
36
37#[derive(Debug, Clone)]
39pub struct FullBlockClient<Client>
40where
41 Client: BlockClient,
42{
43 client: Client,
44 consensus: Arc<dyn Consensus<Client::Block>>,
45}
46
47impl<Client> FullBlockClient<Client>
48where
49 Client: BlockClient,
50{
51 pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
53 Self { client, consensus }
54 }
55
56 #[cfg(any(test, feature = "test-utils"))]
58 pub fn test_client(client: Client) -> Self {
59 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
60 }
61}
62
63impl<Client> FullBlockClient<Client>
64where
65 Client: BlockClient,
66{
67 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
74 FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
75 }
76
77 pub fn get_full_block_range(
87 &self,
88 hash: B256,
89 count: u64,
90 ) -> FetchFullBlockRangeFuture<Client> {
91 let client = self.client.clone();
92 FetchFullBlockRangeFuture {
93 start_hash: hash,
94 count,
95 request: FullBlockRangeRequest {
96 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
97 bodies: None,
98 },
99 client,
100 headers: None,
101 pending_headers: VecDeque::new(),
102 bodies: HashMap::default(),
103 consensus: Arc::clone(&self.consensus),
104 }
105 }
106}
107
108impl<Client> FullBlockClient<Client>
109where
110 Client: BlockClient + BlockAccessListsClient,
111{
112 pub fn get_full_block_with_access_lists(
120 &self,
121 hash: B256,
122 ) -> FetchFullBlockWithBalFuture<Client> {
123 self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
124 }
125
126 pub fn get_full_block_with_access_lists_with_requirement(
134 &self,
135 hash: B256,
136 requirement: BalRequirement,
137 ) -> FetchFullBlockWithBalFuture<Client> {
138 let client = self.client.clone();
139 FetchFullBlockWithBalFuture {
140 block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
141 block_result: None,
142 bal_request_state: BalRequestState::Pending(
143 client.get_block_access_lists_with_requirement(vec![hash], requirement),
144 ),
145 }
146 }
147
148 pub fn get_full_block_range_with_optional_access_lists(
155 &self,
156 hash: B256,
157 count: u64,
158 ) -> FetchFullBlockRangeWithBalFuture<Client> {
159 self.get_full_block_range_with_optional_access_lists_with_requirement(
160 hash,
161 count,
162 BalRequirement::default(),
163 )
164 }
165
166 pub fn get_full_block_range_with_optional_access_lists_with_requirement(
173 &self,
174 hash: B256,
175 count: u64,
176 requirement: BalRequirement,
177 ) -> FetchFullBlockRangeWithBalFuture<Client> {
178 let client = self.client.clone();
179 FetchFullBlockRangeWithBalFuture {
180 blocks: self.get_full_block_range(hash, count),
181 client,
182 block_result: None,
183 access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
184 }
185 }
186}
187
188#[must_use = "futures do nothing unless polled"]
193pub struct FetchFullBlockFuture<Client>
194where
195 Client: BlockClient,
196{
197 client: Client,
198 consensus: Arc<dyn Consensus<Client::Block>>,
199 hash: B256,
200 request: FullBlockRequest<Client>,
201 header: Option<SealedHeader<Client::Header>>,
202 body: Option<BodyResponse<Client::Body>>,
203}
204
205impl<Client> FetchFullBlockFuture<Client>
206where
207 Client: BlockClient,
208{
209 fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
210 Self {
211 hash,
212 consensus,
213 request: FullBlockRequest {
214 header: Some(client.get_header(hash.into())),
215 body: Some(client.get_block_body(hash)),
216 },
217 client,
218 header: None,
219 body: None,
220 }
221 }
222
223 pub const fn hash(&self) -> &B256 {
225 &self.hash
226 }
227
228 pub fn block_number(&self) -> Option<u64> {
230 self.header.as_ref().map(|h| h.number())
231 }
232
233 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
235 if self.header.is_none() || self.body.is_none() {
236 return None
237 }
238
239 let header = self.header.take().unwrap();
240 let resp = self.body.take().unwrap();
241 match resp {
242 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
243 BodyResponse::PendingValidation(resp) => {
244 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
246 {
247 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
248 self.client.report_bad_message(resp.peer_id());
249 self.header = Some(header);
250 self.request.body = Some(self.client.get_block_body(self.hash));
251 return None
252 }
253 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
254 }
255 }
256 }
257
258 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
259 if let Some(ref header) = self.header {
260 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
261 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
262 self.client.report_bad_message(resp.peer_id());
263 return
264 }
265 self.body = Some(BodyResponse::Validated(resp.into_data()));
266 return
267 }
268 self.body = Some(BodyResponse::PendingValidation(resp));
269 }
270}
271
272impl<Client> Future for FetchFullBlockFuture<Client>
273where
274 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
275{
276 type Output = SealedBlock<Client::Block>;
277
278 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
279 let this = self.get_mut();
280
281 let mut budget = 4;
283
284 loop {
285 match ready!(this.request.poll(cx)) {
286 ResponseResult::Header(res) => {
287 match res {
288 Ok(maybe_header) => {
289 let (peer, maybe_header) =
290 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
291 if let Some(header) = maybe_header {
292 if header.hash() == this.hash {
293 this.header = Some(header);
294 } else {
295 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
296 this.client.report_bad_message(peer)
298 }
299 }
300 }
301 Err(err) => {
302 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
303 }
304 }
305
306 if this.header.is_none() {
307 this.request.header = Some(this.client.get_header(this.hash.into()));
309 }
310 }
311 ResponseResult::Body(res) => {
312 match res {
313 Ok(maybe_body) => {
314 if let Some(body) = maybe_body.transpose() {
315 this.on_block_response(body);
316 }
317 }
318 Err(err) => {
319 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
320 }
321 }
322 if this.body.is_none() {
323 this.request.body = Some(this.client.get_block_body(this.hash));
325 }
326 }
327 }
328
329 if let Some(res) = this.take_block() {
330 return Poll::Ready(res)
331 }
332
333 budget -= 1;
335 if budget == 0 {
336 cx.waker().wake_by_ref();
338 return Poll::Pending
339 }
340 }
341 }
342}
343
344#[must_use = "futures do nothing unless polled"]
349pub struct FetchFullBlockWithBalFuture<Client>
350where
351 Client: BlockClient + BlockAccessListsClient,
352{
353 block: FetchFullBlockFuture<Client>,
354 block_result: Option<SealedBlock<Client::Block>>,
355 bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
356}
357
358impl<Client> FetchFullBlockWithBalFuture<Client>
359where
360 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
361{
362 pub const fn hash(&self) -> &B256 {
364 self.block.hash()
365 }
366}
367
368impl<Client> FetchFullBlockWithBalFuture<Client>
369where
370 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
371{
372 pub fn block_number(&self) -> Option<u64> {
374 self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
375 }
376
377 fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
383 let res = match &mut self.bal_request_state {
384 BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
385 BalRequestState::Ready(_) => return Poll::Ready(()),
386 };
387
388 match res {
389 Ok(bal) => {
390 let (peer, access_lists) = bal.split();
391 match access_lists.0.len() {
392 0 => self.bal_request_state = BalRequestState::Ready(None),
393 1 => {
394 let bal = access_lists.0.into_iter().next().expect("len checked");
395 self.bal_request_state =
396 BalRequestState::Ready(Some(WithPeerId::new(peer, bal)));
397 }
398 received => {
399 debug!(
400 target: "downloaders",
401 hash = ?self.block.hash(),
402 expected = 1,
403 received,
404 "Received wrong access list response",
405 );
406 self.block.client.report_bad_message(peer);
407 self.bal_request_state = BalRequestState::Ready(None);
408 }
409 }
410 }
411 Err(err) => {
412 debug!(
413 target: "downloaders",
414 %err,
415 hash = ?self.block.hash(),
416 "Access list download failed",
417 );
418 self.bal_request_state = BalRequestState::Ready(None);
419 }
420 }
421
422 Poll::Ready(())
423 }
424
425 fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
431 let BalRequestState::Ready(bal) = &mut self.bal_request_state else { return None };
432 let block = self.block_result.take()?;
433 let raw_bal =
434 bal.take().and_then(|bal| match seal_block_access_list_for_block(&block, bal) {
435 Ok(raw_bal) => raw_bal,
436 Err(peer) => {
437 self.block.client.report_bad_message(peer);
438 None
439 }
440 });
441 Some(SealedBlockWith::new(block, raw_bal))
442 }
443}
444
445impl<Client> Future for FetchFullBlockWithBalFuture<Client>
446where
447 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
448{
449 type Output = SealedBlockWithAccessList<Client::Block>;
450
451 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
452 let this = self.get_mut();
453
454 if this.block_result.is_none() &&
455 let Poll::Ready(block) = this.block.poll_unpin(cx)
456 {
457 this.block_result = Some(block);
458 }
459
460 ready!(this.poll_bal_request(cx));
461
462 if let Some(res) = this.take_block_and_access_lists() {
463 return Poll::Ready(res)
464 }
465
466 Poll::Pending
467 }
468}
469
470impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
471where
472 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
473{
474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475 f.debug_struct("FetchFullBlockWithBalFuture")
476 .field("hash", &self.block.hash())
477 .field("block_ready", &self.block_result.is_some())
478 .field("bal_request_ready", &self.bal_request_state.is_ready())
479 .finish()
480 }
481}
482
483enum BalRequestState<Req> {
485 Pending(Req),
486 Ready(Option<WithPeerId<Option<Bytes>>>),
487}
488
489impl<Req> BalRequestState<Req> {
490 const fn is_ready(&self) -> bool {
491 matches!(self, Self::Ready(_))
492 }
493}
494
495#[must_use = "futures do nothing unless polled"]
502#[expect(missing_debug_implementations)]
503pub struct FetchFullBlockRangeWithBalFuture<Client>
504where
505 Client: BlockClient + BlockAccessListsClient,
506{
507 blocks: FetchFullBlockRangeFuture<Client>,
508 client: Client,
509 block_result: Option<Vec<SealedBlock<Client::Block>>>,
510 access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
511}
512
513impl<Client> FetchFullBlockRangeWithBalFuture<Client>
514where
515 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
516 + BlockAccessListsClient,
517{
518 pub const fn start_hash(&self) -> B256 {
520 self.blocks.start_hash()
521 }
522
523 pub const fn count(&self) -> u64 {
525 self.blocks.count()
526 }
527
528 fn start_access_lists_request_if_possible(&mut self) {
529 let requirement = match &self.access_lists {
530 OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
531 OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
532 return
533 }
534 };
535
536 let Some(blocks) = self.block_result.as_ref() else { return };
538 let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
539 self.access_lists = OptionalBlockAccessListsState::Pending(
540 self.client.get_block_access_lists_with_requirement(hashes, requirement),
541 );
542 }
543
544 fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
546 self.start_access_lists_request_if_possible();
547
548 let poll = match &mut self.access_lists {
549 OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
550 OptionalBlockAccessListsState::WaitingForBlocks { .. } |
551 OptionalBlockAccessListsState::Ready(_) => return,
552 };
553
554 match poll {
555 Poll::Pending => {}
556 Poll::Ready(Ok(access_lists)) => {
557 self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
558 }
559 Poll::Ready(Err(err)) => {
560 debug!(
561 target: "downloaders",
562 %err,
563 start_hash = ?self.blocks.start_hash(),
564 "Access list range download failed",
565 );
566
567 self.access_lists = OptionalBlockAccessListsState::Ready(None);
570 }
571 }
572 }
573
574 fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
576 let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
577 return None
578 };
579
580 let blocks = self.block_result.take()?;
581
582 Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
583 }
584}
585
586impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
587where
588 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
589 + BlockAccessListsClient
590 + 'static,
591{
592 type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
593
594 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
595 let this = self.get_mut();
596
597 if this.block_result.is_none() &&
599 let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
600 {
601 this.block_result = Some(blocks);
602 }
603
604 this.poll_access_lists(cx);
605
606 if let Some(response) = this.take_response() {
607 return Poll::Ready(response)
608 }
609
610 Poll::Pending
611 }
612}
613
614enum OptionalBlockAccessListsState<Req> {
616 WaitingForBlocks {
618 requirement: BalRequirement,
620 },
621 Pending(Req),
623 Ready(Option<WithPeerId<BlockAccessLists>>),
625}
626
627impl<Client> Debug for FetchFullBlockFuture<Client>
628where
629 Client: BlockClient<Header: Debug, Body: Debug>,
630{
631 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 f.debug_struct("FetchFullBlockFuture")
633 .field("hash", &self.hash)
634 .field("header", &self.header)
635 .field("body", &self.body)
636 .finish()
637 }
638}
639
640struct FullBlockRequest<Client>
641where
642 Client: BlockClient,
643{
644 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
645 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
646}
647
648impl<Client> FullBlockRequest<Client>
649where
650 Client: BlockClient,
651{
652 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
653 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
654 let Poll::Ready(res) = fut.poll(cx)
655 {
656 self.header = None;
657 return Poll::Ready(ResponseResult::Header(res))
658 }
659
660 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
661 let Poll::Ready(res) = fut.poll(cx)
662 {
663 self.body = None;
664 return Poll::Ready(ResponseResult::Body(res))
665 }
666
667 Poll::Pending
668 }
669}
670
671enum ResponseResult<H, B> {
674 Header(PeerRequestResult<Option<H>>),
675 Body(PeerRequestResult<Option<B>>),
676}
677
678#[derive(Debug)]
680enum BodyResponse<B> {
681 Validated(B),
683 PendingValidation(WithPeerId<B>),
685}
686#[must_use = "futures do nothing unless polled"]
699#[expect(missing_debug_implementations)]
700pub struct FetchFullBlockRangeFuture<Client>
701where
702 Client: BlockClient,
703{
704 client: Client,
706 consensus: Arc<dyn Consensus<Client::Block>>,
708 start_hash: B256,
710 count: u64,
712 request: FullBlockRangeRequest<Client>,
714 headers: Option<Vec<SealedHeader<Client::Header>>>,
716 pending_headers: VecDeque<SealedHeader<Client::Header>>,
718 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
720}
721
722impl<Client> FetchFullBlockRangeFuture<Client>
723where
724 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
725{
726 fn is_bodies_complete(&self) -> bool {
728 self.bodies.len() == self.count as usize
729 }
730
731 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
735 if let Some(header) = self.pending_headers.pop_front() {
736 self.bodies.insert(header, body_response);
737 }
738 }
739
740 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
742 for body in bodies {
743 self.insert_body(body);
744 }
745 }
746
747 fn remaining_bodies_hashes(&self) -> Vec<B256> {
750 self.pending_headers.iter().map(|h| h.hash()).collect()
751 }
752
753 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
761 if !self.is_bodies_complete() {
762 return None
764 }
765
766 let headers = self.headers.take()?;
767 let mut needs_retry = false;
768 let mut valid_responses = Vec::new();
769
770 for header in &headers {
771 if let Some(body_resp) = self.bodies.remove(header) {
772 let body = match body_resp {
774 BodyResponse::Validated(body) => body,
775 BodyResponse::PendingValidation(resp) => {
776 if let Err(err) =
778 self.consensus.validate_body_against_header(resp.data(), header)
779 {
780 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
781 self.client.report_bad_message(resp.peer_id());
782
783 self.pending_headers.push_back(header.clone());
785 needs_retry = true;
786 continue
787 }
788
789 resp.into_data()
790 }
791 };
792
793 valid_responses
794 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
795 }
796 }
797
798 if needs_retry {
799 for block in valid_responses {
802 let (header, body) = block.split_sealed_header_body();
803 self.bodies.insert(header, BodyResponse::Validated(body));
804 }
805
806 self.headers = Some(headers);
808
809 let hashes = self.remaining_bodies_hashes();
811 self.request.bodies = Some(self.client.get_block_bodies(hashes));
812 return None
813 }
814
815 Some(valid_responses)
816 }
817
818 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
819 let (peer, mut headers_falling) =
820 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
821
822 if headers_falling.len() == self.count as usize {
824 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
826
827 if headers_falling[0].hash() == self.start_hash {
829 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
830 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
832 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
833 self.client.report_bad_message(peer);
834 }
835
836 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
838
839 self.pending_headers = headers_falling.clone().into();
841
842 if !self.has_bodies_request_started() {
844 self.request.bodies = Some(self.client.get_block_bodies(hashes));
846 }
847
848 self.headers = Some(headers_falling);
850 } else {
851 self.client.report_bad_message(peer);
853 }
854 }
855 }
856
857 const fn has_bodies_request_started(&self) -> bool {
860 self.request.bodies.is_some()
861 }
862
863 pub const fn start_hash(&self) -> B256 {
865 self.start_hash
866 }
867
868 pub const fn count(&self) -> u64 {
870 self.count
871 }
872}
873
874impl<Client> Future for FetchFullBlockRangeFuture<Client>
875where
876 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
877{
878 type Output = Vec<SealedBlock<Client::Block>>;
879
880 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
881 let this = self.get_mut();
882
883 loop {
884 match ready!(this.request.poll(cx)) {
885 RangeResponseResult::Header(res) => {
894 match res {
895 Ok(headers) => {
896 this.on_headers_response(headers);
897 }
898 Err(err) => {
899 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
900 }
901 }
902
903 if this.headers.is_none() {
904 this.request.headers = Some(this.client.get_headers(HeadersRequest {
906 start: this.start_hash.into(),
907 limit: this.count,
908 direction: HeadersDirection::Falling,
909 }));
910 }
911 }
912 RangeResponseResult::Body(res) => {
918 match res {
919 Ok(bodies_resp) => {
920 let (peer, new_bodies) = bodies_resp.split();
921
922 this.insert_bodies(
924 new_bodies
925 .into_iter()
926 .map(|resp| WithPeerId::new(peer, resp))
927 .map(BodyResponse::PendingValidation),
928 );
929
930 if !this.is_bodies_complete() {
931 let req_hashes = this.remaining_bodies_hashes();
933
934 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
936 }
937 }
938 Err(err) => {
939 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
940 }
941 }
942 if this.request.bodies.is_none() && !this.is_bodies_complete() {
943 let hashes = this.remaining_bodies_hashes();
956 if !hashes.is_empty() {
957 this.request.bodies = Some(this.client.get_block_bodies(hashes));
958 }
959 }
960 }
961 }
962
963 if let Some(res) = this.take_blocks() {
964 return Poll::Ready(res)
965 }
966 }
967 }
968}
969
970struct FullBlockRangeRequest<Client>
974where
975 Client: BlockClient,
976{
977 headers: Option<<Client as HeadersClient>::Output>,
978 bodies: Option<<Client as BodiesClient>::Output>,
979}
980
981impl<Client> FullBlockRangeRequest<Client>
982where
983 Client: BlockClient,
984{
985 fn poll(
986 &mut self,
987 cx: &mut Context<'_>,
988 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
989 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
990 let Poll::Ready(res) = fut.poll(cx)
991 {
992 self.headers = None;
993 return Poll::Ready(RangeResponseResult::Header(res))
994 }
995
996 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
997 let Poll::Ready(res) = fut.poll(cx)
998 {
999 self.bodies = None;
1000 return Poll::Ready(RangeResponseResult::Body(res))
1001 }
1002
1003 Poll::Pending
1004 }
1005}
1006
1007enum RangeResponseResult<H, B> {
1010 Header(PeerRequestResult<Vec<H>>),
1011 Body(PeerRequestResult<Vec<B>>),
1012}
1013
1014#[derive(Debug, Clone)]
1016#[non_exhaustive]
1017pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1018
1019impl<Net> DownloadClient for NoopFullBlockClient<Net>
1021where
1022 Net: Debug + Send + Sync,
1023{
1024 fn report_bad_message(&self, _peer_id: PeerId) {}
1031
1032 fn num_connected_peers(&self) -> usize {
1038 0
1039 }
1040}
1041
1042impl<Net> BodiesClient for NoopFullBlockClient<Net>
1044where
1045 Net: NetworkPrimitives,
1046{
1047 type Body = Net::BlockBody;
1048 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1050
1051 fn get_block_bodies_with_priority_and_range_hint(
1062 &self,
1063 _hashes: Vec<B256>,
1064 _priority: Priority,
1065 _range_hint: Option<RangeInclusive<u64>>,
1066 ) -> Self::Output {
1067 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1070 }
1071}
1072
1073impl<Net> HeadersClient for NoopFullBlockClient<Net>
1074where
1075 Net: NetworkPrimitives,
1076{
1077 type Header = Net::BlockHeader;
1078 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1081
1082 fn get_headers_with_priority(
1096 &self,
1097 _request: HeadersRequest,
1098 _priority: Priority,
1099 ) -> Self::Output {
1100 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1101 }
1102}
1103
1104impl<Net> BlockClient for NoopFullBlockClient<Net>
1105where
1106 Net: NetworkPrimitives,
1107{
1108 type Block = Net::Block;
1109}
1110
1111impl<Net> BlockAccessListsClient for NoopFullBlockClient<Net>
1112where
1113 Net: NetworkPrimitives,
1114{
1115 type Output = futures::future::Ready<PeerRequestResult<BlockAccessLists>>;
1116
1117 fn get_block_access_lists_with_priority_and_requirement(
1118 &self,
1119 _hashes: Vec<B256>,
1120 _priority: Priority,
1121 _requirement: BalRequirement,
1122 ) -> Self::Output {
1123 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), BlockAccessLists::default())))
1124 }
1125}
1126
1127impl<Net> Default for NoopFullBlockClient<Net> {
1128 fn default() -> Self {
1129 Self(PhantomData::<Net>)
1130 }
1131}
1132
1133fn seal_block_access_list_for_block<B: Block>(
1139 block: &SealedBlock<B>,
1140 bal: WithPeerId<Option<Bytes>>,
1141) -> Result<Option<RawBal>, PeerId> {
1142 let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1143
1144 let (peer, bal) = bal.split();
1145 let Some(bal) = bal else { return Ok(None) };
1146 let raw_bal = RawBal::new(bal);
1147 let computed = raw_bal.hash();
1148 if computed == expected {
1149 return Ok(Some(raw_bal))
1150 }
1151
1152 debug!(
1153 target: "downloaders",
1154 block_hash = ?block.hash(),
1155 ?computed,
1156 ?expected,
1157 "Received block access list with wrong hash",
1158 );
1159 Err(peer)
1160}
1161
1162fn seal_blocks_with_access_lists<Client>(
1169 client: &Client,
1170 blocks: Vec<SealedBlock<Client::Block>>,
1171 access_lists: Option<WithPeerId<BlockAccessLists>>,
1172) -> Vec<SealedBlockWithAccessList<Client::Block>>
1173where
1174 Client: BlockClient,
1175{
1176 let Some(access_lists) = access_lists else {
1177 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1178 };
1179
1180 let (peer, access_lists) = access_lists.split();
1181 let expected = blocks.len();
1182 let received = access_lists.0.len();
1183
1184 if received > expected {
1185 trace!(
1186 target: "downloaders",
1187 expected,
1188 received,
1189 "Ignoring overlong access list range response",
1190 );
1191 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1192 }
1193
1194 let mut access_lists = access_lists.0.into_iter();
1195 let mut blocks = blocks.into_iter();
1196 let mut response = Vec::with_capacity(expected);
1197
1198 for block in blocks.by_ref() {
1199 let Some(bal) = access_lists.next() else {
1200 response.push(SealedBlockWith::from_block(block));
1203 break
1204 };
1205
1206 match seal_block_access_list_for_block(&block, WithPeerId::new(peer, bal)) {
1207 Ok(raw_bal) => response.push(SealedBlockWith::new(block, raw_bal)),
1208 Err(peer) => {
1209 client.report_bad_message(peer);
1212 response.push(SealedBlockWith::from_block(block));
1213 break
1214 }
1215 }
1216 }
1217
1218 response.extend(blocks.map(SealedBlockWith::from_block));
1219 response
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224 use reth_ethereum_primitives::BlockBody;
1225
1226 use super::*;
1227 use crate::{error::RequestError, test_utils::TestFullBlockClient};
1228 use alloy_consensus::Header;
1229 use alloy_primitives::{keccak256, map::B256Map, Bytes};
1230 use parking_lot::Mutex;
1231 use std::{
1232 ops::Range,
1233 sync::{
1234 atomic::{AtomicBool, AtomicUsize, Ordering},
1235 Arc,
1236 },
1237 };
1238
1239 const EMPTY_LIST_CODE: u8 = 0xc0;
1240 use tokio::time::{timeout, Duration};
1241
1242 fn sealed_header_with_access_list_hash(bal: &Bytes) -> SealedHeader {
1243 let header =
1244 Header { block_access_list_hash: Some(keccak256(bal.as_ref())), ..Default::default() };
1245 SealedHeader::seal_slow(header)
1246 }
1247
1248 fn range_access_lists<B: Block>(
1249 blocks: &[SealedBlockWithAccessList<B>],
1250 ) -> Vec<Option<RawBal>> {
1251 blocks.iter().map(|block| block.data().clone()).collect()
1252 }
1253
1254 #[tokio::test]
1255 async fn download_single_full_block() {
1256 let client = TestFullBlockClient::default();
1257 let header: SealedHeader = SealedHeader::default();
1258 let body = BlockBody::default();
1259 client.insert(header.clone(), body.clone());
1260 let client = FullBlockClient::test_client(client);
1261
1262 let received = client.get_full_block(header.hash()).await;
1263 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1264 }
1265
1266 #[tokio::test]
1267 async fn download_single_full_block_range() {
1268 let client = TestFullBlockClient::default();
1269 let header: SealedHeader = SealedHeader::default();
1270 let body = BlockBody::default();
1271 client.insert(header.clone(), body.clone());
1272 let client = FullBlockClient::test_client(client);
1273
1274 let received = client.get_full_block_range(header.hash(), 1).await;
1275 let received = received.first().expect("response should include a block");
1276 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1277 }
1278
1279 #[tokio::test]
1280 async fn download_single_full_block_with_access_lists() {
1281 let client = FullBlockWithAccessListsClient::default();
1282 let body = BlockBody::default();
1283 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1284 let header = sealed_header_with_access_list_hash(&bal);
1285 client.insert(header.clone(), body.clone(), bal.clone());
1286
1287 let request_count = Arc::clone(&client.access_list_requests);
1288 let client = FullBlockClient::test_client(client);
1289
1290 let received = client.get_full_block_with_access_lists(header.hash()).await;
1291 let expected_raw_bal = RawBal::from(bal);
1292
1293 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1294 assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1295 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1296 }
1297
1298 #[tokio::test]
1299 async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1300 let client = FullBlockWithAccessListsClient::default();
1301 let body = BlockBody::default();
1302 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1303 let header = sealed_header_with_access_list_hash(&bal);
1304 client.insert(header.clone(), body.clone(), bal.clone());
1305
1306 let requirement = Arc::clone(&client.last_access_list_requirement);
1307 let client = FullBlockClient::test_client(client);
1308
1309 let received = client
1310 .get_full_block_with_access_lists_with_requirement(
1311 header.hash(),
1312 BalRequirement::Mandatory,
1313 )
1314 .await;
1315
1316 let expected_raw_bal = RawBal::from(bal);
1317 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1318 assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1319 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1320 }
1321
1322 #[tokio::test]
1323 async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1324 let client = FullBlockWithAccessListsClient::default();
1325 client.set_access_list_pending_polls(1);
1326
1327 let body = BlockBody::default();
1328 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1329 let header = sealed_header_with_access_list_hash(&bal);
1330 client.insert(header.clone(), body.clone(), bal.clone());
1331
1332 let request_count = Arc::clone(&client.access_list_requests);
1333 let client = FullBlockClient::test_client(client);
1334
1335 let received =
1336 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1337 .await
1338 .expect("access list request should complete");
1339
1340 let expected_raw_bal = RawBal::from(bal);
1341 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1342 assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1343 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1344 }
1345
1346 #[tokio::test]
1347 async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1348 let client = FullBlockWithAccessListsClient::default();
1349 let body = BlockBody::default();
1350 let expected_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1351 let wrong_bal = Bytes::from_static(&[0xc1, 0x01]);
1352 let header = sealed_header_with_access_list_hash(&expected_bal);
1353 client.insert(header.clone(), body.clone(), wrong_bal);
1354
1355 let bad_messages = Arc::clone(&client.bad_messages);
1356 let client = FullBlockClient::test_client(client);
1357
1358 let received =
1359 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1360 .await
1361 .expect("block request should complete without access lists");
1362
1363 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1364 assert!(received.data().is_none());
1365 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1366 }
1367
1368 #[tokio::test]
1369 async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1370 let client = FullBlockWithAccessListsClient::default();
1371 let body = BlockBody::default();
1372 let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1373 let header = sealed_header_with_access_list_hash(&expected_bal);
1374 client.inner.insert(header.clone(), body.clone());
1375
1376 let bad_messages = Arc::clone(&client.bad_messages);
1377 let client = FullBlockClient::test_client(client);
1378
1379 let received =
1380 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1381 .await
1382 .expect("block request should complete without access lists");
1383
1384 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1385 assert!(received.data().is_none());
1386 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1387 }
1388
1389 #[tokio::test]
1390 async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1391 let client = FullBlockWithAccessListsClient::default();
1392 let body = BlockBody::default();
1393 let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1394 let wrong_empty_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1395 let header = sealed_header_with_access_list_hash(&expected_bal);
1396 client.insert(header.clone(), body.clone(), wrong_empty_bal);
1397
1398 let bad_messages = Arc::clone(&client.bad_messages);
1399 let client = FullBlockClient::test_client(client);
1400
1401 let received =
1402 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1403 .await
1404 .expect("block request should complete without access lists");
1405
1406 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1407 assert!(received.data().is_none());
1408 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1409 }
1410
1411 #[tokio::test]
1412 async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1413 let client = FullBlockWithAccessListsClient::default();
1414 client.empty_first_response.store(true, Ordering::SeqCst);
1415
1416 let body = BlockBody::default();
1417 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1418 let header = sealed_header_with_access_list_hash(&bal);
1419 client.insert(header.clone(), body.clone(), bal.clone());
1420
1421 let request_count = Arc::clone(&client.access_list_requests);
1422 let bad_messages = Arc::clone(&client.bad_messages);
1423 let client = FullBlockClient::test_client(client);
1424
1425 let received =
1426 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1427 .await
1428 .expect("block request should complete without access lists");
1429
1430 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1431 assert!(received.data().is_none());
1432 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1433 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1434 }
1435
1436 #[tokio::test]
1437 async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1438 let client = FullBlockWithAccessListsClient::default();
1439 client.set_access_lists_unsupported(true);
1440
1441 let body = BlockBody::default();
1442 let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1443 let header = sealed_header_with_access_list_hash(&bal);
1444 client.insert(header.clone(), body.clone(), bal);
1445
1446 let request_count = Arc::clone(&client.access_list_requests);
1447 let requirement = Arc::clone(&client.last_access_list_requirement);
1448 let client = FullBlockClient::test_client(client);
1449
1450 let received =
1451 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1452 .await
1453 .expect("block request should complete without access lists");
1454
1455 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1456 assert!(received.data().is_none());
1457 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1458 assert_eq!(
1459 *requirement.lock(),
1460 Some(BalRequirement::Optional),
1461 "single block BAL lookup should be best-effort"
1462 );
1463 }
1464
1465 fn insert_headers_into_client(
1467 client: &TestFullBlockClient,
1468 range: Range<usize>,
1469 ) -> (SealedHeader, BlockBody) {
1470 let mut sealed_header: SealedHeader = SealedHeader::default();
1471 let body = BlockBody::default();
1472 for _ in range {
1473 let (mut header, hash) = sealed_header.split();
1474 header.parent_hash = hash;
1476 header.number += 1;
1477
1478 sealed_header = SealedHeader::seal_slow(header);
1479
1480 client.insert(sealed_header.clone(), body.clone());
1481 }
1482
1483 (sealed_header, body)
1484 }
1485
1486 #[derive(Clone, Debug)]
1487 struct FullBlockWithAccessListsClient {
1488 inner: TestFullBlockClient,
1489 access_lists: Arc<Mutex<B256Map<Bytes>>>,
1490 access_list_requests: Arc<AtomicUsize>,
1491 access_list_soft_limit: Arc<AtomicUsize>,
1492 access_list_pending_polls: Arc<AtomicUsize>,
1493 extra_access_list_entries: Arc<AtomicUsize>,
1494 unsupported_access_lists: Arc<AtomicBool>,
1495 last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1496 bad_messages: Arc<AtomicUsize>,
1497 empty_first_response: Arc<AtomicBool>,
1498 }
1499
1500 impl Default for FullBlockWithAccessListsClient {
1501 fn default() -> Self {
1502 Self {
1503 inner: TestFullBlockClient::default(),
1504 access_lists: Arc::new(Mutex::new(B256Map::default())),
1505 access_list_requests: Arc::new(AtomicUsize::new(0)),
1506 access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1507 access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1508 extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1509 unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1510 last_access_list_requirement: Arc::new(Mutex::new(None)),
1511 bad_messages: Arc::new(AtomicUsize::new(0)),
1512 empty_first_response: Arc::new(AtomicBool::new(false)),
1513 }
1514 }
1515 }
1516
1517 impl FullBlockWithAccessListsClient {
1518 fn insert(&self, header: SealedHeader, body: BlockBody, bal: Bytes) {
1519 self.inner.insert(header.clone(), body);
1520 self.access_lists.lock().insert(header.hash(), bal);
1521 }
1522
1523 fn set_access_list_soft_limit(&self, limit: usize) {
1524 self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1525 }
1526
1527 fn set_access_list_pending_polls(&self, polls: usize) {
1528 self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1529 }
1530
1531 fn set_extra_access_list_entries(&self, count: usize) {
1532 self.extra_access_list_entries.store(count, Ordering::SeqCst);
1533 }
1534
1535 fn set_access_lists_unsupported(&self, unsupported: bool) {
1536 self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1537 }
1538 }
1539
1540 fn insert_headers_with_access_lists_into_client(
1542 client: &FullBlockWithAccessListsClient,
1543 range: Range<usize>,
1544 ) -> (SealedHeader, BlockBody) {
1545 let mut sealed_header: SealedHeader = SealedHeader::default();
1546 let body = BlockBody::default();
1547 for block_idx in range {
1548 let (mut header, hash) = sealed_header.split();
1549 header.parent_hash = hash;
1550 header.number += 1;
1551 let bal = Bytes::from(vec![0xc1, block_idx as u8]);
1552 header.block_access_list_hash = Some(keccak256(bal.as_ref()));
1553
1554 sealed_header = SealedHeader::seal_slow(header);
1555
1556 client.insert(sealed_header.clone(), body.clone(), bal);
1557 }
1558
1559 (sealed_header, body)
1560 }
1561
1562 impl DownloadClient for FullBlockWithAccessListsClient {
1563 fn report_bad_message(&self, peer_id: PeerId) {
1564 self.bad_messages.fetch_add(1, Ordering::SeqCst);
1565 self.inner.report_bad_message(peer_id);
1566 }
1567
1568 fn num_connected_peers(&self) -> usize {
1569 self.inner.num_connected_peers()
1570 }
1571 }
1572
1573 impl HeadersClient for FullBlockWithAccessListsClient {
1574 type Header = <TestFullBlockClient as HeadersClient>::Header;
1575 type Output = <TestFullBlockClient as HeadersClient>::Output;
1576
1577 fn get_headers_with_priority(
1578 &self,
1579 request: HeadersRequest,
1580 priority: Priority,
1581 ) -> Self::Output {
1582 self.inner.get_headers_with_priority(request, priority)
1583 }
1584 }
1585
1586 impl BodiesClient for FullBlockWithAccessListsClient {
1587 type Body = <TestFullBlockClient as BodiesClient>::Body;
1588 type Output = <TestFullBlockClient as BodiesClient>::Output;
1589
1590 fn get_block_bodies_with_priority_and_range_hint(
1591 &self,
1592 hashes: Vec<B256>,
1593 priority: Priority,
1594 range_hint: Option<RangeInclusive<u64>>,
1595 ) -> Self::Output {
1596 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1597 }
1598 }
1599
1600 struct MaybePendingAccessLists {
1601 response: Option<PeerRequestResult<BlockAccessLists>>,
1602 pending_polls: usize,
1603 }
1604
1605 impl MaybePendingAccessLists {
1606 const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1607 Self { response: Some(response), pending_polls }
1608 }
1609 }
1610
1611 impl std::future::Future for MaybePendingAccessLists {
1612 type Output = PeerRequestResult<BlockAccessLists>;
1613
1614 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1615 if self.pending_polls > 0 {
1616 self.pending_polls -= 1;
1617 cx.waker().wake_by_ref();
1618 return Poll::Pending
1619 }
1620
1621 Poll::Ready(self.response.take().expect("future polled after completion"))
1622 }
1623 }
1624
1625 impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1626 type Output = MaybePendingAccessLists;
1627
1628 fn get_block_access_lists_with_priority_and_requirement(
1629 &self,
1630 hashes: Vec<B256>,
1631 _priority: Priority,
1632 requirement: BalRequirement,
1633 ) -> Self::Output {
1634 self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1635 *self.last_access_list_requirement.lock() = Some(requirement);
1636 let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1637
1638 if self.unsupported_access_lists.load(Ordering::SeqCst) {
1639 return MaybePendingAccessLists::new(
1640 Err(RequestError::UnsupportedCapability),
1641 pending_polls,
1642 )
1643 }
1644
1645 if self.empty_first_response.swap(false, Ordering::SeqCst) {
1646 return MaybePendingAccessLists::new(
1647 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1648 pending_polls,
1649 )
1650 }
1651
1652 let mut access_lists: Vec<_> = hashes
1653 .into_iter()
1654 .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1655 .map(|hash| self.access_lists.lock().get(&hash).cloned())
1656 .collect();
1657 for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1658 access_lists.push(None);
1659 }
1660
1661 MaybePendingAccessLists::new(
1662 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1663 pending_polls,
1664 )
1665 }
1666 }
1667
1668 impl BlockClient for FullBlockWithAccessListsClient {
1669 type Block = reth_ethereum_primitives::Block;
1670 }
1671
1672 #[derive(Clone, Debug)]
1673 struct FailingBodiesClient {
1674 inner: TestFullBlockClient,
1675 fail_on: usize,
1676 body_requests: Arc<AtomicUsize>,
1677 }
1678
1679 impl FailingBodiesClient {
1680 fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1681 Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1682 }
1683 }
1684
1685 impl DownloadClient for FailingBodiesClient {
1686 fn report_bad_message(&self, peer_id: PeerId) {
1687 self.inner.report_bad_message(peer_id);
1688 }
1689
1690 fn num_connected_peers(&self) -> usize {
1691 self.inner.num_connected_peers()
1692 }
1693 }
1694
1695 impl HeadersClient for FailingBodiesClient {
1696 type Header = <TestFullBlockClient as HeadersClient>::Header;
1697 type Output = <TestFullBlockClient as HeadersClient>::Output;
1698
1699 fn get_headers_with_priority(
1700 &self,
1701 request: HeadersRequest,
1702 priority: Priority,
1703 ) -> Self::Output {
1704 self.inner.get_headers_with_priority(request, priority)
1705 }
1706 }
1707
1708 impl BodiesClient for FailingBodiesClient {
1709 type Body = <TestFullBlockClient as BodiesClient>::Body;
1710 type Output = <TestFullBlockClient as BodiesClient>::Output;
1711
1712 fn get_block_bodies_with_priority_and_range_hint(
1713 &self,
1714 hashes: Vec<B256>,
1715 priority: Priority,
1716 range_hint: Option<RangeInclusive<u64>>,
1717 ) -> Self::Output {
1718 let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1719 if attempt == self.fail_on {
1720 return futures::future::ready(Err(RequestError::Timeout))
1721 }
1722
1723 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1724 }
1725 }
1726
1727 impl BlockClient for FailingBodiesClient {
1728 type Block = reth_ethereum_primitives::Block;
1729 }
1730
1731 #[tokio::test]
1732 async fn download_full_block_range() {
1733 let client = TestFullBlockClient::default();
1734 let (header, body) = insert_headers_into_client(&client, 0..50);
1735 let client = FullBlockClient::test_client(client);
1736
1737 let received = client.get_full_block_range(header.hash(), 1).await;
1738 let received = received.first().expect("response should include a block");
1739 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1740
1741 let received = client.get_full_block_range(header.hash(), 10).await;
1742 assert_eq!(received.len(), 10);
1743 for (i, block) in received.iter().enumerate() {
1744 let expected_number = header.number - i as u64;
1745 assert_eq!(block.number, expected_number);
1746 }
1747 }
1748
1749 #[tokio::test]
1750 async fn download_full_block_range_over_soft_limit() {
1751 let client = TestFullBlockClient::default();
1753 let (header, body) = insert_headers_into_client(&client, 0..50);
1754 let client = FullBlockClient::test_client(client);
1755
1756 let received = client.get_full_block_range(header.hash(), 1).await;
1757 let received = received.first().expect("response should include a block");
1758 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1759
1760 let received = client.get_full_block_range(header.hash(), 50).await;
1761 assert_eq!(received.len(), 50);
1762 for (i, block) in received.iter().enumerate() {
1763 let expected_number = header.number - i as u64;
1764 assert_eq!(block.number, expected_number);
1765 }
1766 }
1767
1768 #[tokio::test]
1769 async fn download_full_block_range_retries_after_body_error() {
1770 let mut client = TestFullBlockClient::default();
1771 client.set_soft_limit(2);
1772 let (header, _) = insert_headers_into_client(&client, 0..3);
1773
1774 let client = FailingBodiesClient::new(client, 1);
1775 let body_requests = Arc::clone(&client.body_requests);
1776 let client = FullBlockClient::test_client(client);
1777
1778 let received =
1779 timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1780 .await
1781 .expect("body request retry should complete");
1782
1783 assert_eq!(received.len(), 3);
1784 assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1785 }
1786
1787 #[tokio::test]
1788 async fn download_full_block_range_with_access_lists() {
1789 let client = FullBlockWithAccessListsClient::default();
1790 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1791
1792 let access_lists = Arc::clone(&client.access_lists);
1793 let request_count = Arc::clone(&client.access_list_requests);
1794 let requirement = Arc::clone(&client.last_access_list_requirement);
1795 let client = FullBlockClient::test_client(client);
1796
1797 let response = timeout(
1798 Duration::from_secs(1),
1799 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1800 )
1801 .await
1802 .expect("range request should complete");
1803
1804 let blocks = response;
1805 assert_eq!(blocks.len(), 3);
1806 let expected = {
1807 let bals = access_lists.lock();
1808 blocks
1809 .iter()
1810 .map(|block| {
1811 let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1812 Some(RawBal::from(bal))
1813 })
1814 .collect::<Vec<_>>()
1815 };
1816 assert_eq!(range_access_lists(&blocks), expected);
1817 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1818 assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1819 }
1820
1821 #[tokio::test]
1822 async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1823 let client = FullBlockWithAccessListsClient::default();
1824 client.empty_first_response.store(true, Ordering::SeqCst);
1825 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1826
1827 let request_count = Arc::clone(&client.access_list_requests);
1828 let client = FullBlockClient::test_client(client);
1829
1830 let response = timeout(
1831 Duration::from_secs(1),
1832 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1833 )
1834 .await
1835 .expect("range request should complete without access lists");
1836
1837 let blocks = response;
1838 assert_eq!(blocks.len(), 3);
1839 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1840 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1841 }
1842
1843 #[tokio::test]
1844 async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1845 let client = FullBlockWithAccessListsClient::default();
1846 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1847
1848 let requirement = Arc::clone(&client.last_access_list_requirement);
1849 let client = FullBlockClient::test_client(client);
1850
1851 let blocks = timeout(
1852 Duration::from_secs(1),
1853 client.get_full_block_range_with_optional_access_lists_with_requirement(
1854 header.hash(),
1855 3,
1856 BalRequirement::Mandatory,
1857 ),
1858 )
1859 .await
1860 .expect("range request should complete");
1861
1862 assert_eq!(blocks.len(), 3);
1863 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1864 }
1865
1866 #[tokio::test]
1867 async fn download_full_block_range_with_access_lists_preserves_short_response() {
1868 let client = FullBlockWithAccessListsClient::default();
1869 client.set_access_list_soft_limit(2);
1870 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1871
1872 let access_lists = Arc::clone(&client.access_lists);
1873 let request_count = Arc::clone(&client.access_list_requests);
1874 let client = FullBlockClient::test_client(client);
1875
1876 let blocks = timeout(
1877 Duration::from_secs(1),
1878 client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1879 )
1880 .await
1881 .expect("range request should complete without access lists");
1882
1883 assert_eq!(blocks.len(), 5);
1884 let expected = {
1885 let bals = access_lists.lock();
1886 blocks
1887 .iter()
1888 .enumerate()
1889 .map(|(idx, block)| {
1890 if idx >= 2 {
1891 return None
1892 }
1893
1894 let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1895 Some(RawBal::from(bal))
1896 })
1897 .collect::<Vec<_>>()
1898 };
1899 assert_eq!(range_access_lists(&blocks), expected);
1900 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1901 }
1902
1903 #[tokio::test]
1904 async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1905 let client = FullBlockWithAccessListsClient::default();
1906 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1907 client.access_lists.lock().remove(&header.hash());
1908
1909 let access_lists = Arc::clone(&client.access_lists);
1910 let bad_messages = Arc::clone(&client.bad_messages);
1911 let client = FullBlockClient::test_client(client);
1912
1913 let blocks = timeout(
1914 Duration::from_secs(1),
1915 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1916 )
1917 .await
1918 .expect("range request should complete");
1919
1920 assert_eq!(blocks.len(), 3);
1921 let expected = {
1922 let bals = access_lists.lock();
1923 blocks
1924 .iter()
1925 .map(|block| {
1926 if block.block().hash() == header.hash() {
1927 return None
1928 }
1929
1930 let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1931 Some(RawBal::from(bal))
1932 })
1933 .collect::<Vec<_>>()
1934 };
1935 assert_eq!(range_access_lists(&blocks), expected);
1936 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1937 }
1938
1939 #[tokio::test]
1940 async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1941 let client = FullBlockWithAccessListsClient::default();
1942 client.set_access_lists_unsupported(true);
1943 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1944
1945 let request_count = Arc::clone(&client.access_list_requests);
1946 let client = FullBlockClient::test_client(client);
1947
1948 let blocks = timeout(
1949 Duration::from_secs(1),
1950 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1951 )
1952 .await
1953 .expect("range request should complete without access lists");
1954
1955 assert_eq!(blocks.len(), 3);
1956 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1957 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1958 }
1959
1960 #[tokio::test]
1961 async fn download_full_block_range_with_access_lists_ignores_long_response() {
1962 let client = FullBlockWithAccessListsClient::default();
1963 client.set_extra_access_list_entries(1);
1964 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1965
1966 let request_count = Arc::clone(&client.access_list_requests);
1967 let bad_messages = Arc::clone(&client.bad_messages);
1968 let client = FullBlockClient::test_client(client);
1969
1970 let blocks = timeout(
1971 Duration::from_secs(1),
1972 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1973 )
1974 .await
1975 .expect("range request should complete without access lists");
1976
1977 assert_eq!(blocks.len(), 3);
1978 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1979 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1980 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1981 }
1982
1983 #[tokio::test]
1984 async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
1985 let client = FullBlockWithAccessListsClient::default();
1986 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1987 client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
1988
1989 let bad_messages = Arc::clone(&client.bad_messages);
1990 let client = FullBlockClient::test_client(client);
1991
1992 let blocks = timeout(
1993 Duration::from_secs(1),
1994 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1995 )
1996 .await
1997 .expect("range request should complete without access lists");
1998
1999 assert_eq!(blocks.len(), 3);
2000 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2001 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2002 }
2003
2004 #[tokio::test]
2005 async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
2006 let client = FullBlockWithAccessListsClient::default();
2007 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2008 let first_bal =
2009 client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
2010 let second_hash = header.parent_hash;
2011 client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
2012
2013 let bad_messages = Arc::clone(&client.bad_messages);
2014 let client = FullBlockClient::test_client(client);
2015
2016 let blocks = timeout(
2017 Duration::from_secs(1),
2018 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2019 )
2020 .await
2021 .expect("range request should complete without unvalidated access lists");
2022
2023 assert_eq!(blocks.len(), 3);
2024 assert_eq!(blocks[1].block().hash(), second_hash);
2025 assert_eq!(range_access_lists(&blocks), vec![Some(RawBal::from(first_bal)), None, None]);
2026 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2027 }
2028
2029 #[tokio::test]
2030 async fn download_full_block_range_with_invalid_header() {
2031 let client = TestFullBlockClient::default();
2032 let range_length: usize = 3;
2033 let (header, _) = insert_headers_into_client(&client, 0..range_length);
2034
2035 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2036 test_consensus.set_fail_validation(true);
2037 test_consensus.set_fail_body_against_header(false);
2038 let client = FullBlockClient::new(client, Arc::new(test_consensus));
2039
2040 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2041
2042 assert_eq!(received.len(), range_length);
2043 for (i, block) in received.iter().enumerate() {
2044 let expected_number = header.number - i as u64;
2045 assert_eq!(block.number, expected_number);
2046 }
2047 }
2048}