1use super::headers::client::HeadersRequest;
2use crate::{
3 block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4 bodies::client::{BodiesClient, SingleBodyRequest},
5 download::DownloadClient,
6 error::PeerRequestResult,
7 headers::client::{HeadersClient, SingleHeaderRequest},
8 priority::Priority,
9 BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_primitives::{keccak256, Bytes, Sealable, Sealed, B256};
13use core::marker::PhantomData;
14use futures::FutureExt;
15use reth_consensus::Consensus;
16use reth_eth_wire_types::{
17 BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
18};
19use reth_network_peers::{PeerId, WithPeerId};
20use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
21use std::{
22 cmp::Reverse,
23 collections::{HashMap, VecDeque},
24 fmt::Debug,
25 hash::Hash,
26 ops::RangeInclusive,
27 pin::Pin,
28 sync::Arc,
29 task::{ready, Context, Poll},
30};
31use tracing::{debug, trace};
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct SealedBlockWith<B: Block, T = Option<Sealed<Bytes>>> {
36 block: SealedBlock<B>,
37 data: T,
38}
39
40pub type SealedBlockAccessList = Sealed<Bytes>;
42
43pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<SealedBlockAccessList>>;
45
46impl<B: Block, T> SealedBlockWith<B, T> {
47 pub const fn new(block: SealedBlock<B>, data: T) -> Self {
49 Self { block, data }
50 }
51
52 pub const fn block(&self) -> &SealedBlock<B> {
54 &self.block
55 }
56
57 pub const fn data(&self) -> &T {
59 &self.data
60 }
61
62 pub fn into_parts(self) -> (SealedBlock<B>, T) {
64 (self.block, self.data)
65 }
66}
67
68impl<B: Block> SealedBlockWithAccessList<B> {
69 pub const fn from_block(block: SealedBlock<B>) -> Self {
71 Self::new(block, None)
72 }
73
74 pub const fn access_list(&self) -> Option<&SealedBlockAccessList> {
76 self.data.as_ref()
77 }
78
79 pub const fn access_lists(&self) -> Option<&SealedBlockAccessList> {
81 self.data.as_ref()
82 }
83}
84
85impl<B: Block> From<SealedBlock<B>> for SealedBlockWith<B> {
86 fn from(block: SealedBlock<B>) -> Self {
87 Self::new(block, None)
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct FullBlockClient<Client>
94where
95 Client: BlockClient,
96{
97 client: Client,
98 consensus: Arc<dyn Consensus<Client::Block>>,
99}
100
101impl<Client> FullBlockClient<Client>
102where
103 Client: BlockClient,
104{
105 pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
107 Self { client, consensus }
108 }
109
110 #[cfg(any(test, feature = "test-utils"))]
112 pub fn test_client(client: Client) -> Self {
113 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
114 }
115}
116
117impl<Client> FullBlockClient<Client>
118where
119 Client: BlockClient,
120{
121 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
128 FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
129 }
130
131 pub fn get_full_block_range(
141 &self,
142 hash: B256,
143 count: u64,
144 ) -> FetchFullBlockRangeFuture<Client> {
145 let client = self.client.clone();
146 FetchFullBlockRangeFuture {
147 start_hash: hash,
148 count,
149 request: FullBlockRangeRequest {
150 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
151 bodies: None,
152 },
153 client,
154 headers: None,
155 pending_headers: VecDeque::new(),
156 bodies: HashMap::default(),
157 consensus: Arc::clone(&self.consensus),
158 }
159 }
160}
161
162impl<Client> FullBlockClient<Client>
163where
164 Client: BlockClient + BlockAccessListsClient,
165{
166 pub fn get_full_block_with_access_lists(
174 &self,
175 hash: B256,
176 ) -> FetchFullBlockWithBalFuture<Client> {
177 self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
178 }
179
180 pub fn get_full_block_with_access_lists_with_requirement(
188 &self,
189 hash: B256,
190 requirement: BalRequirement,
191 ) -> FetchFullBlockWithBalFuture<Client> {
192 let client = self.client.clone();
193 FetchFullBlockWithBalFuture {
194 block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
195 block_result: None,
196 bal_request_state: BalRequestState::Pending(
197 client.get_block_access_lists_with_requirement(vec![hash], requirement),
198 ),
199 }
200 }
201
202 pub fn get_full_block_range_with_optional_access_lists(
209 &self,
210 hash: B256,
211 count: u64,
212 ) -> FetchFullBlockRangeWithBalFuture<Client> {
213 self.get_full_block_range_with_optional_access_lists_with_requirement(
214 hash,
215 count,
216 BalRequirement::default(),
217 )
218 }
219
220 pub fn get_full_block_range_with_optional_access_lists_with_requirement(
227 &self,
228 hash: B256,
229 count: u64,
230 requirement: BalRequirement,
231 ) -> FetchFullBlockRangeWithBalFuture<Client> {
232 let client = self.client.clone();
233 FetchFullBlockRangeWithBalFuture {
234 blocks: self.get_full_block_range(hash, count),
235 client,
236 block_result: None,
237 access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
238 }
239 }
240}
241
242#[must_use = "futures do nothing unless polled"]
247pub struct FetchFullBlockFuture<Client>
248where
249 Client: BlockClient,
250{
251 client: Client,
252 consensus: Arc<dyn Consensus<Client::Block>>,
253 hash: B256,
254 request: FullBlockRequest<Client>,
255 header: Option<SealedHeader<Client::Header>>,
256 body: Option<BodyResponse<Client::Body>>,
257}
258
259impl<Client> FetchFullBlockFuture<Client>
260where
261 Client: BlockClient,
262{
263 fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
264 Self {
265 hash,
266 consensus,
267 request: FullBlockRequest {
268 header: Some(client.get_header(hash.into())),
269 body: Some(client.get_block_body(hash)),
270 },
271 client,
272 header: None,
273 body: None,
274 }
275 }
276
277 pub const fn hash(&self) -> &B256 {
279 &self.hash
280 }
281
282 pub fn block_number(&self) -> Option<u64> {
284 self.header.as_ref().map(|h| h.number())
285 }
286
287 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
289 if self.header.is_none() || self.body.is_none() {
290 return None
291 }
292
293 let header = self.header.take().unwrap();
294 let resp = self.body.take().unwrap();
295 match resp {
296 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
297 BodyResponse::PendingValidation(resp) => {
298 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
300 {
301 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
302 self.client.report_bad_message(resp.peer_id());
303 self.header = Some(header);
304 self.request.body = Some(self.client.get_block_body(self.hash));
305 return None
306 }
307 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
308 }
309 }
310 }
311
312 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
313 if let Some(ref header) = self.header {
314 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
315 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
316 self.client.report_bad_message(resp.peer_id());
317 return
318 }
319 self.body = Some(BodyResponse::Validated(resp.into_data()));
320 return
321 }
322 self.body = Some(BodyResponse::PendingValidation(resp));
323 }
324}
325
326impl<Client> Future for FetchFullBlockFuture<Client>
327where
328 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
329{
330 type Output = SealedBlock<Client::Block>;
331
332 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
333 let this = self.get_mut();
334
335 let mut budget = 4;
337
338 loop {
339 match ready!(this.request.poll(cx)) {
340 ResponseResult::Header(res) => {
341 match res {
342 Ok(maybe_header) => {
343 let (peer, maybe_header) =
344 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
345 if let Some(header) = maybe_header {
346 if header.hash() == this.hash {
347 this.header = Some(header);
348 } else {
349 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
350 this.client.report_bad_message(peer)
352 }
353 }
354 }
355 Err(err) => {
356 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
357 }
358 }
359
360 if this.header.is_none() {
361 this.request.header = Some(this.client.get_header(this.hash.into()));
363 }
364 }
365 ResponseResult::Body(res) => {
366 match res {
367 Ok(maybe_body) => {
368 if let Some(body) = maybe_body.transpose() {
369 this.on_block_response(body);
370 }
371 }
372 Err(err) => {
373 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
374 }
375 }
376 if this.body.is_none() {
377 this.request.body = Some(this.client.get_block_body(this.hash));
379 }
380 }
381 }
382
383 if let Some(res) = this.take_block() {
384 return Poll::Ready(res)
385 }
386
387 budget -= 1;
389 if budget == 0 {
390 cx.waker().wake_by_ref();
392 return Poll::Pending
393 }
394 }
395 }
396}
397
398#[must_use = "futures do nothing unless polled"]
403pub struct FetchFullBlockWithBalFuture<Client>
404where
405 Client: BlockClient + BlockAccessListsClient,
406{
407 block: FetchFullBlockFuture<Client>,
408 block_result: Option<SealedBlock<Client::Block>>,
409 bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
410}
411
412impl<Client> FetchFullBlockWithBalFuture<Client>
413where
414 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
415{
416 pub const fn hash(&self) -> &B256 {
418 self.block.hash()
419 }
420}
421
422impl<Client> FetchFullBlockWithBalFuture<Client>
423where
424 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
425{
426 pub fn block_number(&self) -> Option<u64> {
428 self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
429 }
430
431 fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
437 let res = match &mut self.bal_request_state {
438 BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
439 BalRequestState::Ready(_) => return Poll::Ready(()),
440 };
441
442 match res {
443 Ok(bal) => {
444 let (peer, access_lists) = bal.split();
445 match access_lists.0.len() {
446 0 => self.bal_request_state = BalRequestState::Ready(None),
447 1 => {
448 let access_list = access_lists.0.into_iter().next().expect("len checked");
449 self.bal_request_state =
450 BalRequestState::Ready(Some(WithPeerId::new(peer, access_list)));
451 }
452 received => {
453 debug!(
454 target: "downloaders",
455 hash = ?self.block.hash(),
456 expected = 1,
457 received,
458 "Received wrong access list response",
459 );
460 self.block.client.report_bad_message(peer);
461 self.bal_request_state = BalRequestState::Ready(None);
462 }
463 }
464 }
465 Err(err) => {
466 debug!(
467 target: "downloaders",
468 %err,
469 hash = ?self.block.hash(),
470 "Access list download failed",
471 );
472 self.bal_request_state = BalRequestState::Ready(None);
473 }
474 }
475
476 Poll::Ready(())
477 }
478
479 fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
485 let BalRequestState::Ready(access_list) = &mut self.bal_request_state else { return None };
486 let block = self.block_result.take()?;
487 let access_list = access_list.take().and_then(|access_list| {
488 match seal_block_access_list_for_block(&block, access_list) {
489 Ok(access_list) => access_list,
490 Err(peer) => {
491 self.block.client.report_bad_message(peer);
492 None
493 }
494 }
495 });
496 Some(SealedBlockWith::new(block, access_list))
497 }
498}
499
500impl<Client> Future for FetchFullBlockWithBalFuture<Client>
501where
502 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
503{
504 type Output = SealedBlockWithAccessList<Client::Block>;
505
506 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
507 let this = self.get_mut();
508
509 if this.block_result.is_none() &&
510 let Poll::Ready(block) = this.block.poll_unpin(cx)
511 {
512 this.block_result = Some(block);
513 }
514
515 ready!(this.poll_bal_request(cx));
516
517 if let Some(res) = this.take_block_and_access_lists() {
518 return Poll::Ready(res)
519 }
520
521 Poll::Pending
522 }
523}
524
525impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
526where
527 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
528{
529 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
530 f.debug_struct("FetchFullBlockWithBalFuture")
531 .field("hash", &self.block.hash())
532 .field("block_ready", &self.block_result.is_some())
533 .field("bal_request_ready", &self.bal_request_state.is_ready())
534 .finish()
535 }
536}
537
538enum BalRequestState<Req> {
540 Pending(Req),
541 Ready(Option<WithPeerId<Option<Bytes>>>),
542}
543
544impl<Req> BalRequestState<Req> {
545 const fn is_ready(&self) -> bool {
546 matches!(self, Self::Ready(_))
547 }
548}
549
550#[must_use = "futures do nothing unless polled"]
557#[expect(missing_debug_implementations)]
558pub struct FetchFullBlockRangeWithBalFuture<Client>
559where
560 Client: BlockClient + BlockAccessListsClient,
561{
562 blocks: FetchFullBlockRangeFuture<Client>,
563 client: Client,
564 block_result: Option<Vec<SealedBlock<Client::Block>>>,
565 access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
566}
567
568impl<Client> FetchFullBlockRangeWithBalFuture<Client>
569where
570 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
571 + BlockAccessListsClient,
572{
573 fn start_access_lists_request_if_possible(&mut self) {
574 let requirement = match &self.access_lists {
575 OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
576 OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
577 return
578 }
579 };
580
581 let Some(blocks) = self.block_result.as_ref() else { return };
583 let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
584 self.access_lists = OptionalBlockAccessListsState::Pending(
585 self.client.get_block_access_lists_with_requirement(hashes, requirement),
586 );
587 }
588
589 fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
591 self.start_access_lists_request_if_possible();
592
593 let poll = match &mut self.access_lists {
594 OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
595 OptionalBlockAccessListsState::WaitingForBlocks { .. } |
596 OptionalBlockAccessListsState::Ready(_) => return,
597 };
598
599 match poll {
600 Poll::Pending => {}
601 Poll::Ready(Ok(access_lists)) => {
602 self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
603 }
604 Poll::Ready(Err(err)) => {
605 debug!(
606 target: "downloaders",
607 %err,
608 start_hash = ?self.blocks.start_hash(),
609 "Access list range download failed",
610 );
611
612 self.access_lists = OptionalBlockAccessListsState::Ready(None);
615 }
616 }
617 }
618
619 fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
621 let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
622 return None
623 };
624
625 let blocks = self.block_result.take()?;
626
627 Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
628 }
629}
630
631impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
632where
633 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
634 + BlockAccessListsClient
635 + 'static,
636{
637 type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
638
639 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
640 let this = self.get_mut();
641
642 if this.block_result.is_none() &&
644 let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
645 {
646 this.block_result = Some(blocks);
647 }
648
649 this.poll_access_lists(cx);
650
651 if let Some(response) = this.take_response() {
652 return Poll::Ready(response)
653 }
654
655 Poll::Pending
656 }
657}
658
659enum OptionalBlockAccessListsState<Req> {
661 WaitingForBlocks {
663 requirement: BalRequirement,
665 },
666 Pending(Req),
668 Ready(Option<WithPeerId<BlockAccessLists>>),
670}
671
672impl<Client> Debug for FetchFullBlockFuture<Client>
673where
674 Client: BlockClient<Header: Debug, Body: Debug>,
675{
676 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677 f.debug_struct("FetchFullBlockFuture")
678 .field("hash", &self.hash)
679 .field("header", &self.header)
680 .field("body", &self.body)
681 .finish()
682 }
683}
684
685struct FullBlockRequest<Client>
686where
687 Client: BlockClient,
688{
689 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
690 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
691}
692
693impl<Client> FullBlockRequest<Client>
694where
695 Client: BlockClient,
696{
697 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
698 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
699 let Poll::Ready(res) = fut.poll(cx)
700 {
701 self.header = None;
702 return Poll::Ready(ResponseResult::Header(res))
703 }
704
705 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
706 let Poll::Ready(res) = fut.poll(cx)
707 {
708 self.body = None;
709 return Poll::Ready(ResponseResult::Body(res))
710 }
711
712 Poll::Pending
713 }
714}
715
716enum ResponseResult<H, B> {
719 Header(PeerRequestResult<Option<H>>),
720 Body(PeerRequestResult<Option<B>>),
721}
722
723#[derive(Debug)]
725enum BodyResponse<B> {
726 Validated(B),
728 PendingValidation(WithPeerId<B>),
730}
731#[must_use = "futures do nothing unless polled"]
744#[expect(missing_debug_implementations)]
745pub struct FetchFullBlockRangeFuture<Client>
746where
747 Client: BlockClient,
748{
749 client: Client,
751 consensus: Arc<dyn Consensus<Client::Block>>,
753 start_hash: B256,
755 count: u64,
757 request: FullBlockRangeRequest<Client>,
759 headers: Option<Vec<SealedHeader<Client::Header>>>,
761 pending_headers: VecDeque<SealedHeader<Client::Header>>,
763 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
765}
766
767impl<Client> FetchFullBlockRangeFuture<Client>
768where
769 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
770{
771 fn is_bodies_complete(&self) -> bool {
773 self.bodies.len() == self.count as usize
774 }
775
776 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
780 if let Some(header) = self.pending_headers.pop_front() {
781 self.bodies.insert(header, body_response);
782 }
783 }
784
785 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
787 for body in bodies {
788 self.insert_body(body);
789 }
790 }
791
792 fn remaining_bodies_hashes(&self) -> Vec<B256> {
795 self.pending_headers.iter().map(|h| h.hash()).collect()
796 }
797
798 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
806 if !self.is_bodies_complete() {
807 return None
809 }
810
811 let headers = self.headers.take()?;
812 let mut needs_retry = false;
813 let mut valid_responses = Vec::new();
814
815 for header in &headers {
816 if let Some(body_resp) = self.bodies.remove(header) {
817 let body = match body_resp {
819 BodyResponse::Validated(body) => body,
820 BodyResponse::PendingValidation(resp) => {
821 if let Err(err) =
823 self.consensus.validate_body_against_header(resp.data(), header)
824 {
825 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
826 self.client.report_bad_message(resp.peer_id());
827
828 self.pending_headers.push_back(header.clone());
830 needs_retry = true;
831 continue
832 }
833
834 resp.into_data()
835 }
836 };
837
838 valid_responses
839 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
840 }
841 }
842
843 if needs_retry {
844 for block in valid_responses {
847 let (header, body) = block.split_sealed_header_body();
848 self.bodies.insert(header, BodyResponse::Validated(body));
849 }
850
851 self.headers = Some(headers);
853
854 let hashes = self.remaining_bodies_hashes();
856 self.request.bodies = Some(self.client.get_block_bodies(hashes));
857 return None
858 }
859
860 Some(valid_responses)
861 }
862
863 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
864 let (peer, mut headers_falling) =
865 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
866
867 if headers_falling.len() == self.count as usize {
869 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
871
872 if headers_falling[0].hash() == self.start_hash {
874 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
875 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
877 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
878 self.client.report_bad_message(peer);
879 }
880
881 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
883
884 self.pending_headers = headers_falling.clone().into();
886
887 if !self.has_bodies_request_started() {
889 self.request.bodies = Some(self.client.get_block_bodies(hashes));
891 }
892
893 self.headers = Some(headers_falling);
895 } else {
896 self.client.report_bad_message(peer);
898 }
899 }
900 }
901
902 const fn has_bodies_request_started(&self) -> bool {
905 self.request.bodies.is_some()
906 }
907
908 pub const fn start_hash(&self) -> B256 {
910 self.start_hash
911 }
912
913 pub const fn count(&self) -> u64 {
915 self.count
916 }
917}
918
919impl<Client> Future for FetchFullBlockRangeFuture<Client>
920where
921 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
922{
923 type Output = Vec<SealedBlock<Client::Block>>;
924
925 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
926 let this = self.get_mut();
927
928 loop {
929 match ready!(this.request.poll(cx)) {
930 RangeResponseResult::Header(res) => {
939 match res {
940 Ok(headers) => {
941 this.on_headers_response(headers);
942 }
943 Err(err) => {
944 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
945 }
946 }
947
948 if this.headers.is_none() {
949 this.request.headers = Some(this.client.get_headers(HeadersRequest {
951 start: this.start_hash.into(),
952 limit: this.count,
953 direction: HeadersDirection::Falling,
954 }));
955 }
956 }
957 RangeResponseResult::Body(res) => {
963 match res {
964 Ok(bodies_resp) => {
965 let (peer, new_bodies) = bodies_resp.split();
966
967 this.insert_bodies(
969 new_bodies
970 .into_iter()
971 .map(|resp| WithPeerId::new(peer, resp))
972 .map(BodyResponse::PendingValidation),
973 );
974
975 if !this.is_bodies_complete() {
976 let req_hashes = this.remaining_bodies_hashes();
978
979 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
981 }
982 }
983 Err(err) => {
984 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
985 }
986 }
987 if this.request.bodies.is_none() && !this.is_bodies_complete() {
988 let hashes = this.remaining_bodies_hashes();
1001 if !hashes.is_empty() {
1002 this.request.bodies = Some(this.client.get_block_bodies(hashes));
1003 }
1004 }
1005 }
1006 }
1007
1008 if let Some(res) = this.take_blocks() {
1009 return Poll::Ready(res)
1010 }
1011 }
1012 }
1013}
1014
1015struct FullBlockRangeRequest<Client>
1019where
1020 Client: BlockClient,
1021{
1022 headers: Option<<Client as HeadersClient>::Output>,
1023 bodies: Option<<Client as BodiesClient>::Output>,
1024}
1025
1026impl<Client> FullBlockRangeRequest<Client>
1027where
1028 Client: BlockClient,
1029{
1030 fn poll(
1031 &mut self,
1032 cx: &mut Context<'_>,
1033 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
1034 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
1035 let Poll::Ready(res) = fut.poll(cx)
1036 {
1037 self.headers = None;
1038 return Poll::Ready(RangeResponseResult::Header(res))
1039 }
1040
1041 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
1042 let Poll::Ready(res) = fut.poll(cx)
1043 {
1044 self.bodies = None;
1045 return Poll::Ready(RangeResponseResult::Body(res))
1046 }
1047
1048 Poll::Pending
1049 }
1050}
1051
1052enum RangeResponseResult<H, B> {
1055 Header(PeerRequestResult<Vec<H>>),
1056 Body(PeerRequestResult<Vec<B>>),
1057}
1058
1059#[derive(Debug, Clone)]
1061#[non_exhaustive]
1062pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1063
1064impl<Net> DownloadClient for NoopFullBlockClient<Net>
1066where
1067 Net: Debug + Send + Sync,
1068{
1069 fn report_bad_message(&self, _peer_id: PeerId) {}
1076
1077 fn num_connected_peers(&self) -> usize {
1083 0
1084 }
1085}
1086
1087impl<Net> BodiesClient for NoopFullBlockClient<Net>
1089where
1090 Net: NetworkPrimitives,
1091{
1092 type Body = Net::BlockBody;
1093 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1095
1096 fn get_block_bodies_with_priority_and_range_hint(
1107 &self,
1108 _hashes: Vec<B256>,
1109 _priority: Priority,
1110 _range_hint: Option<RangeInclusive<u64>>,
1111 ) -> Self::Output {
1112 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1115 }
1116}
1117
1118impl<Net> HeadersClient for NoopFullBlockClient<Net>
1119where
1120 Net: NetworkPrimitives,
1121{
1122 type Header = Net::BlockHeader;
1123 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1126
1127 fn get_headers_with_priority(
1141 &self,
1142 _request: HeadersRequest,
1143 _priority: Priority,
1144 ) -> Self::Output {
1145 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1146 }
1147}
1148
1149impl<Net> BlockClient for NoopFullBlockClient<Net>
1150where
1151 Net: NetworkPrimitives,
1152{
1153 type Block = Net::Block;
1154}
1155
1156impl<Net> Default for NoopFullBlockClient<Net> {
1157 fn default() -> Self {
1158 Self(PhantomData::<Net>)
1159 }
1160}
1161
1162fn seal_block_access_list_for_block<B: Block>(
1168 block: &SealedBlock<B>,
1169 access_list: WithPeerId<Option<Bytes>>,
1170) -> Result<Option<Sealed<Bytes>>, PeerId> {
1171 let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1172
1173 let (peer, access_list) = access_list.split();
1174 let Some(access_list) = access_list else { return Ok(None) };
1175 let computed = keccak256(access_list.as_ref());
1176 if computed == expected {
1177 return Ok(Some(Sealed::new_unchecked(access_list, expected)))
1178 }
1179
1180 debug!(
1181 target: "downloaders",
1182 block_hash = ?block.hash(),
1183 ?computed,
1184 ?expected,
1185 "Received block access list with wrong hash",
1186 );
1187 Err(peer)
1188}
1189
1190fn seal_blocks_with_access_lists<Client>(
1197 client: &Client,
1198 blocks: Vec<SealedBlock<Client::Block>>,
1199 access_lists: Option<WithPeerId<BlockAccessLists>>,
1200) -> Vec<SealedBlockWithAccessList<Client::Block>>
1201where
1202 Client: BlockClient,
1203{
1204 let Some(access_lists) = access_lists else {
1205 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1206 };
1207
1208 let (peer, access_lists) = access_lists.split();
1209 let expected = blocks.len();
1210 let received = access_lists.0.len();
1211
1212 if received > expected {
1213 trace!(
1214 target: "downloaders",
1215 expected,
1216 received,
1217 "Ignoring overlong access list range response",
1218 );
1219 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1220 }
1221
1222 let mut access_lists = access_lists.0.into_iter();
1223 let mut blocks = blocks.into_iter();
1224 let mut response = Vec::with_capacity(expected);
1225
1226 for block in blocks.by_ref() {
1227 let Some(access_list) = access_lists.next() else {
1228 response.push(SealedBlockWith::from_block(block));
1231 break
1232 };
1233
1234 match seal_block_access_list_for_block(&block, WithPeerId::new(peer, access_list)) {
1235 Ok(access_list) => response.push(SealedBlockWith::new(block, access_list)),
1236 Err(peer) => {
1237 client.report_bad_message(peer);
1240 response.push(SealedBlockWith::from_block(block));
1241 break
1242 }
1243 }
1244 }
1245
1246 response.extend(blocks.map(SealedBlockWith::from_block));
1247 response
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252 use reth_ethereum_primitives::BlockBody;
1253
1254 use super::*;
1255 use crate::{error::RequestError, test_utils::TestFullBlockClient};
1256 use alloy_consensus::Header;
1257 use alloy_primitives::{keccak256, map::B256Map, Bytes};
1258 use parking_lot::Mutex;
1259 use std::{
1260 ops::Range,
1261 sync::{
1262 atomic::{AtomicBool, AtomicUsize, Ordering},
1263 Arc,
1264 },
1265 };
1266
1267 const EMPTY_LIST_CODE: u8 = 0xc0;
1268 use tokio::time::{timeout, Duration};
1269
1270 fn sealed_access_list(access_list: Bytes) -> Sealed<Bytes> {
1271 let hash = keccak256(access_list.as_ref());
1272 Sealed::new_unchecked(access_list, hash)
1273 }
1274
1275 fn sealed_header_with_access_list_hash(access_list: &Bytes) -> SealedHeader {
1276 let header = Header {
1277 block_access_list_hash: Some(keccak256(access_list.as_ref())),
1278 ..Default::default()
1279 };
1280 SealedHeader::seal_slow(header)
1281 }
1282
1283 fn range_access_lists<B: Block>(
1284 blocks: &[SealedBlockWithAccessList<B>],
1285 ) -> Vec<Option<Sealed<Bytes>>> {
1286 blocks.iter().map(|block| block.access_list().cloned()).collect()
1287 }
1288
1289 #[tokio::test]
1290 async fn download_single_full_block() {
1291 let client = TestFullBlockClient::default();
1292 let header: SealedHeader = SealedHeader::default();
1293 let body = BlockBody::default();
1294 client.insert(header.clone(), body.clone());
1295 let client = FullBlockClient::test_client(client);
1296
1297 let received = client.get_full_block(header.hash()).await;
1298 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1299 }
1300
1301 #[tokio::test]
1302 async fn download_single_full_block_range() {
1303 let client = TestFullBlockClient::default();
1304 let header: SealedHeader = SealedHeader::default();
1305 let body = BlockBody::default();
1306 client.insert(header.clone(), body.clone());
1307 let client = FullBlockClient::test_client(client);
1308
1309 let received = client.get_full_block_range(header.hash(), 1).await;
1310 let received = received.first().expect("response should include a block");
1311 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1312 }
1313
1314 #[tokio::test]
1315 async fn download_single_full_block_with_access_lists() {
1316 let client = FullBlockWithAccessListsClient::default();
1317 let body = BlockBody::default();
1318 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1319 let header = sealed_header_with_access_list_hash(&access_list);
1320 client.insert(header.clone(), body.clone(), access_list.clone());
1321
1322 let request_count = Arc::clone(&client.access_list_requests);
1323 let client = FullBlockClient::test_client(client);
1324
1325 let received = client.get_full_block_with_access_lists(header.hash()).await;
1326 let expected_access_list = sealed_access_list(access_list);
1327
1328 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1329 assert_eq!(received.access_list(), Some(&expected_access_list));
1330 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1331 }
1332
1333 #[tokio::test]
1334 async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1335 let client = FullBlockWithAccessListsClient::default();
1336 let body = BlockBody::default();
1337 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1338 let header = sealed_header_with_access_list_hash(&access_list);
1339 client.insert(header.clone(), body.clone(), access_list.clone());
1340
1341 let requirement = Arc::clone(&client.last_access_list_requirement);
1342 let client = FullBlockClient::test_client(client);
1343
1344 let received = client
1345 .get_full_block_with_access_lists_with_requirement(
1346 header.hash(),
1347 BalRequirement::Mandatory,
1348 )
1349 .await;
1350
1351 let expected_access_list = sealed_access_list(access_list);
1352 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1353 assert_eq!(received.access_list(), Some(&expected_access_list));
1354 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1355 }
1356
1357 #[tokio::test]
1358 async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1359 let client = FullBlockWithAccessListsClient::default();
1360 client.set_access_list_pending_polls(1);
1361
1362 let body = BlockBody::default();
1363 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1364 let header = sealed_header_with_access_list_hash(&access_list);
1365 client.insert(header.clone(), body.clone(), access_list.clone());
1366
1367 let request_count = Arc::clone(&client.access_list_requests);
1368 let client = FullBlockClient::test_client(client);
1369
1370 let received =
1371 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1372 .await
1373 .expect("access list request should complete");
1374
1375 let expected_access_list = sealed_access_list(access_list);
1376 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1377 assert_eq!(received.access_list(), Some(&expected_access_list));
1378 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1379 }
1380
1381 #[tokio::test]
1382 async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1383 let client = FullBlockWithAccessListsClient::default();
1384 let body = BlockBody::default();
1385 let expected_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1386 let wrong_access_list = Bytes::from_static(&[0xc1, 0x01]);
1387 let header = sealed_header_with_access_list_hash(&expected_access_list);
1388 client.insert(header.clone(), body.clone(), wrong_access_list);
1389
1390 let bad_messages = Arc::clone(&client.bad_messages);
1391 let client = FullBlockClient::test_client(client);
1392
1393 let received =
1394 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1395 .await
1396 .expect("block request should complete without access lists");
1397
1398 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1399 assert!(received.access_list().is_none());
1400 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1401 }
1402
1403 #[tokio::test]
1404 async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1405 let client = FullBlockWithAccessListsClient::default();
1406 let body = BlockBody::default();
1407 let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1408 let header = sealed_header_with_access_list_hash(&expected_access_list);
1409 client.inner.insert(header.clone(), body.clone());
1410
1411 let bad_messages = Arc::clone(&client.bad_messages);
1412 let client = FullBlockClient::test_client(client);
1413
1414 let received =
1415 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1416 .await
1417 .expect("block request should complete without access lists");
1418
1419 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1420 assert!(received.access_list().is_none());
1421 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1422 }
1423
1424 #[tokio::test]
1425 async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1426 let client = FullBlockWithAccessListsClient::default();
1427 let body = BlockBody::default();
1428 let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1429 let wrong_empty_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1430 let header = sealed_header_with_access_list_hash(&expected_access_list);
1431 client.insert(header.clone(), body.clone(), wrong_empty_access_list);
1432
1433 let bad_messages = Arc::clone(&client.bad_messages);
1434 let client = FullBlockClient::test_client(client);
1435
1436 let received =
1437 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1438 .await
1439 .expect("block request should complete without access lists");
1440
1441 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1442 assert!(received.access_list().is_none());
1443 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1444 }
1445
1446 #[tokio::test]
1447 async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1448 let client = FullBlockWithAccessListsClient::default();
1449 client.empty_first_response.store(true, Ordering::SeqCst);
1450
1451 let body = BlockBody::default();
1452 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1453 let header = sealed_header_with_access_list_hash(&access_list);
1454 client.insert(header.clone(), body.clone(), access_list.clone());
1455
1456 let request_count = Arc::clone(&client.access_list_requests);
1457 let bad_messages = Arc::clone(&client.bad_messages);
1458 let client = FullBlockClient::test_client(client);
1459
1460 let received =
1461 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1462 .await
1463 .expect("block request should complete without access lists");
1464
1465 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1466 assert!(received.access_list().is_none());
1467 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1468 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1469 }
1470
1471 #[tokio::test]
1472 async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1473 let client = FullBlockWithAccessListsClient::default();
1474 client.set_access_lists_unsupported(true);
1475
1476 let body = BlockBody::default();
1477 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1478 let header = sealed_header_with_access_list_hash(&access_list);
1479 client.insert(header.clone(), body.clone(), access_list);
1480
1481 let request_count = Arc::clone(&client.access_list_requests);
1482 let requirement = Arc::clone(&client.last_access_list_requirement);
1483 let client = FullBlockClient::test_client(client);
1484
1485 let received =
1486 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1487 .await
1488 .expect("block request should complete without access lists");
1489
1490 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1491 assert!(received.access_list().is_none());
1492 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1493 assert_eq!(
1494 *requirement.lock(),
1495 Some(BalRequirement::Optional),
1496 "single block BAL lookup should be best-effort"
1497 );
1498 }
1499
1500 fn insert_headers_into_client(
1502 client: &TestFullBlockClient,
1503 range: Range<usize>,
1504 ) -> (SealedHeader, BlockBody) {
1505 let mut sealed_header: SealedHeader = SealedHeader::default();
1506 let body = BlockBody::default();
1507 for _ in range {
1508 let (mut header, hash) = sealed_header.split();
1509 header.parent_hash = hash;
1511 header.number += 1;
1512
1513 sealed_header = SealedHeader::seal_slow(header);
1514
1515 client.insert(sealed_header.clone(), body.clone());
1516 }
1517
1518 (sealed_header, body)
1519 }
1520
1521 #[derive(Clone, Debug)]
1522 struct FullBlockWithAccessListsClient {
1523 inner: TestFullBlockClient,
1524 access_lists: Arc<Mutex<B256Map<Bytes>>>,
1525 access_list_requests: Arc<AtomicUsize>,
1526 access_list_soft_limit: Arc<AtomicUsize>,
1527 access_list_pending_polls: Arc<AtomicUsize>,
1528 extra_access_list_entries: Arc<AtomicUsize>,
1529 unsupported_access_lists: Arc<AtomicBool>,
1530 last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1531 bad_messages: Arc<AtomicUsize>,
1532 empty_first_response: Arc<AtomicBool>,
1533 }
1534
1535 impl Default for FullBlockWithAccessListsClient {
1536 fn default() -> Self {
1537 Self {
1538 inner: TestFullBlockClient::default(),
1539 access_lists: Arc::new(Mutex::new(B256Map::default())),
1540 access_list_requests: Arc::new(AtomicUsize::new(0)),
1541 access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1542 access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1543 extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1544 unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1545 last_access_list_requirement: Arc::new(Mutex::new(None)),
1546 bad_messages: Arc::new(AtomicUsize::new(0)),
1547 empty_first_response: Arc::new(AtomicBool::new(false)),
1548 }
1549 }
1550 }
1551
1552 impl FullBlockWithAccessListsClient {
1553 fn insert(&self, header: SealedHeader, body: BlockBody, access_list: Bytes) {
1554 self.inner.insert(header.clone(), body);
1555 self.access_lists.lock().insert(header.hash(), access_list);
1556 }
1557
1558 fn set_access_list_soft_limit(&self, limit: usize) {
1559 self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1560 }
1561
1562 fn set_access_list_pending_polls(&self, polls: usize) {
1563 self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1564 }
1565
1566 fn set_extra_access_list_entries(&self, count: usize) {
1567 self.extra_access_list_entries.store(count, Ordering::SeqCst);
1568 }
1569
1570 fn set_access_lists_unsupported(&self, unsupported: bool) {
1571 self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1572 }
1573 }
1574
1575 fn insert_headers_with_access_lists_into_client(
1577 client: &FullBlockWithAccessListsClient,
1578 range: Range<usize>,
1579 ) -> (SealedHeader, BlockBody) {
1580 let mut sealed_header: SealedHeader = SealedHeader::default();
1581 let body = BlockBody::default();
1582 for block_idx in range {
1583 let (mut header, hash) = sealed_header.split();
1584 header.parent_hash = hash;
1585 header.number += 1;
1586 let access_list = Bytes::from(vec![0xc1, block_idx as u8]);
1587 header.block_access_list_hash = Some(keccak256(access_list.as_ref()));
1588
1589 sealed_header = SealedHeader::seal_slow(header);
1590
1591 client.insert(sealed_header.clone(), body.clone(), access_list);
1592 }
1593
1594 (sealed_header, body)
1595 }
1596
1597 impl DownloadClient for FullBlockWithAccessListsClient {
1598 fn report_bad_message(&self, peer_id: PeerId) {
1599 self.bad_messages.fetch_add(1, Ordering::SeqCst);
1600 self.inner.report_bad_message(peer_id);
1601 }
1602
1603 fn num_connected_peers(&self) -> usize {
1604 self.inner.num_connected_peers()
1605 }
1606 }
1607
1608 impl HeadersClient for FullBlockWithAccessListsClient {
1609 type Header = <TestFullBlockClient as HeadersClient>::Header;
1610 type Output = <TestFullBlockClient as HeadersClient>::Output;
1611
1612 fn get_headers_with_priority(
1613 &self,
1614 request: HeadersRequest,
1615 priority: Priority,
1616 ) -> Self::Output {
1617 self.inner.get_headers_with_priority(request, priority)
1618 }
1619 }
1620
1621 impl BodiesClient for FullBlockWithAccessListsClient {
1622 type Body = <TestFullBlockClient as BodiesClient>::Body;
1623 type Output = <TestFullBlockClient as BodiesClient>::Output;
1624
1625 fn get_block_bodies_with_priority_and_range_hint(
1626 &self,
1627 hashes: Vec<B256>,
1628 priority: Priority,
1629 range_hint: Option<RangeInclusive<u64>>,
1630 ) -> Self::Output {
1631 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1632 }
1633 }
1634
1635 struct MaybePendingAccessLists {
1636 response: Option<PeerRequestResult<BlockAccessLists>>,
1637 pending_polls: usize,
1638 }
1639
1640 impl MaybePendingAccessLists {
1641 const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1642 Self { response: Some(response), pending_polls }
1643 }
1644 }
1645
1646 impl std::future::Future for MaybePendingAccessLists {
1647 type Output = PeerRequestResult<BlockAccessLists>;
1648
1649 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1650 if self.pending_polls > 0 {
1651 self.pending_polls -= 1;
1652 cx.waker().wake_by_ref();
1653 return Poll::Pending
1654 }
1655
1656 Poll::Ready(self.response.take().expect("future polled after completion"))
1657 }
1658 }
1659
1660 impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1661 type Output = MaybePendingAccessLists;
1662
1663 fn get_block_access_lists_with_priority_and_requirement(
1664 &self,
1665 hashes: Vec<B256>,
1666 _priority: Priority,
1667 requirement: BalRequirement,
1668 ) -> Self::Output {
1669 self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1670 *self.last_access_list_requirement.lock() = Some(requirement);
1671 let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1672
1673 if self.unsupported_access_lists.load(Ordering::SeqCst) {
1674 return MaybePendingAccessLists::new(
1675 Err(RequestError::UnsupportedCapability),
1676 pending_polls,
1677 )
1678 }
1679
1680 if self.empty_first_response.swap(false, Ordering::SeqCst) {
1681 return MaybePendingAccessLists::new(
1682 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1683 pending_polls,
1684 )
1685 }
1686
1687 let mut access_lists: Vec<_> = hashes
1688 .into_iter()
1689 .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1690 .map(|hash| self.access_lists.lock().get(&hash).cloned())
1691 .collect();
1692 for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1693 access_lists.push(None);
1694 }
1695
1696 MaybePendingAccessLists::new(
1697 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1698 pending_polls,
1699 )
1700 }
1701 }
1702
1703 impl BlockClient for FullBlockWithAccessListsClient {
1704 type Block = reth_ethereum_primitives::Block;
1705 }
1706
1707 #[derive(Clone, Debug)]
1708 struct FailingBodiesClient {
1709 inner: TestFullBlockClient,
1710 fail_on: usize,
1711 body_requests: Arc<AtomicUsize>,
1712 }
1713
1714 impl FailingBodiesClient {
1715 fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1716 Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1717 }
1718 }
1719
1720 impl DownloadClient for FailingBodiesClient {
1721 fn report_bad_message(&self, peer_id: PeerId) {
1722 self.inner.report_bad_message(peer_id);
1723 }
1724
1725 fn num_connected_peers(&self) -> usize {
1726 self.inner.num_connected_peers()
1727 }
1728 }
1729
1730 impl HeadersClient for FailingBodiesClient {
1731 type Header = <TestFullBlockClient as HeadersClient>::Header;
1732 type Output = <TestFullBlockClient as HeadersClient>::Output;
1733
1734 fn get_headers_with_priority(
1735 &self,
1736 request: HeadersRequest,
1737 priority: Priority,
1738 ) -> Self::Output {
1739 self.inner.get_headers_with_priority(request, priority)
1740 }
1741 }
1742
1743 impl BodiesClient for FailingBodiesClient {
1744 type Body = <TestFullBlockClient as BodiesClient>::Body;
1745 type Output = <TestFullBlockClient as BodiesClient>::Output;
1746
1747 fn get_block_bodies_with_priority_and_range_hint(
1748 &self,
1749 hashes: Vec<B256>,
1750 priority: Priority,
1751 range_hint: Option<RangeInclusive<u64>>,
1752 ) -> Self::Output {
1753 let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1754 if attempt == self.fail_on {
1755 return futures::future::ready(Err(RequestError::Timeout))
1756 }
1757
1758 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1759 }
1760 }
1761
1762 impl BlockClient for FailingBodiesClient {
1763 type Block = reth_ethereum_primitives::Block;
1764 }
1765
1766 #[tokio::test]
1767 async fn download_full_block_range() {
1768 let client = TestFullBlockClient::default();
1769 let (header, body) = insert_headers_into_client(&client, 0..50);
1770 let client = FullBlockClient::test_client(client);
1771
1772 let received = client.get_full_block_range(header.hash(), 1).await;
1773 let received = received.first().expect("response should include a block");
1774 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1775
1776 let received = client.get_full_block_range(header.hash(), 10).await;
1777 assert_eq!(received.len(), 10);
1778 for (i, block) in received.iter().enumerate() {
1779 let expected_number = header.number - i as u64;
1780 assert_eq!(block.number, expected_number);
1781 }
1782 }
1783
1784 #[tokio::test]
1785 async fn download_full_block_range_over_soft_limit() {
1786 let client = TestFullBlockClient::default();
1788 let (header, body) = insert_headers_into_client(&client, 0..50);
1789 let client = FullBlockClient::test_client(client);
1790
1791 let received = client.get_full_block_range(header.hash(), 1).await;
1792 let received = received.first().expect("response should include a block");
1793 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1794
1795 let received = client.get_full_block_range(header.hash(), 50).await;
1796 assert_eq!(received.len(), 50);
1797 for (i, block) in received.iter().enumerate() {
1798 let expected_number = header.number - i as u64;
1799 assert_eq!(block.number, expected_number);
1800 }
1801 }
1802
1803 #[tokio::test]
1804 async fn download_full_block_range_retries_after_body_error() {
1805 let mut client = TestFullBlockClient::default();
1806 client.set_soft_limit(2);
1807 let (header, _) = insert_headers_into_client(&client, 0..3);
1808
1809 let client = FailingBodiesClient::new(client, 1);
1810 let body_requests = Arc::clone(&client.body_requests);
1811 let client = FullBlockClient::test_client(client);
1812
1813 let received =
1814 timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1815 .await
1816 .expect("body request retry should complete");
1817
1818 assert_eq!(received.len(), 3);
1819 assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1820 }
1821
1822 #[tokio::test]
1823 async fn download_full_block_range_with_access_lists() {
1824 let client = FullBlockWithAccessListsClient::default();
1825 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1826
1827 let access_lists = Arc::clone(&client.access_lists);
1828 let request_count = Arc::clone(&client.access_list_requests);
1829 let requirement = Arc::clone(&client.last_access_list_requirement);
1830 let client = FullBlockClient::test_client(client);
1831
1832 let response = timeout(
1833 Duration::from_secs(1),
1834 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1835 )
1836 .await
1837 .expect("range request should complete");
1838
1839 let blocks = response;
1840 assert_eq!(blocks.len(), 3);
1841 let expected = {
1842 let access_lists = access_lists.lock();
1843 blocks
1844 .iter()
1845 .map(|block| {
1846 let access_list = access_lists
1847 .get(&block.block().hash())
1848 .cloned()
1849 .expect("access list exists");
1850 Some(sealed_access_list(access_list))
1851 })
1852 .collect::<Vec<_>>()
1853 };
1854 assert_eq!(range_access_lists(&blocks), expected);
1855 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1856 assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1857 }
1858
1859 #[tokio::test]
1860 async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1861 let client = FullBlockWithAccessListsClient::default();
1862 client.empty_first_response.store(true, Ordering::SeqCst);
1863 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1864
1865 let request_count = Arc::clone(&client.access_list_requests);
1866 let client = FullBlockClient::test_client(client);
1867
1868 let response = timeout(
1869 Duration::from_secs(1),
1870 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1871 )
1872 .await
1873 .expect("range request should complete without access lists");
1874
1875 let blocks = response;
1876 assert_eq!(blocks.len(), 3);
1877 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1878 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1879 }
1880
1881 #[tokio::test]
1882 async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1883 let client = FullBlockWithAccessListsClient::default();
1884 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1885
1886 let requirement = Arc::clone(&client.last_access_list_requirement);
1887 let client = FullBlockClient::test_client(client);
1888
1889 let blocks = timeout(
1890 Duration::from_secs(1),
1891 client.get_full_block_range_with_optional_access_lists_with_requirement(
1892 header.hash(),
1893 3,
1894 BalRequirement::Mandatory,
1895 ),
1896 )
1897 .await
1898 .expect("range request should complete");
1899
1900 assert_eq!(blocks.len(), 3);
1901 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1902 }
1903
1904 #[tokio::test]
1905 async fn download_full_block_range_with_access_lists_preserves_short_response() {
1906 let client = FullBlockWithAccessListsClient::default();
1907 client.set_access_list_soft_limit(2);
1908 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1909
1910 let access_lists = Arc::clone(&client.access_lists);
1911 let request_count = Arc::clone(&client.access_list_requests);
1912 let client = FullBlockClient::test_client(client);
1913
1914 let blocks = timeout(
1915 Duration::from_secs(1),
1916 client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1917 )
1918 .await
1919 .expect("range request should complete without access lists");
1920
1921 assert_eq!(blocks.len(), 5);
1922 let expected = {
1923 let access_lists = access_lists.lock();
1924 blocks
1925 .iter()
1926 .enumerate()
1927 .map(|(idx, block)| {
1928 if idx >= 2 {
1929 return None
1930 }
1931
1932 let access_list = access_lists
1933 .get(&block.block().hash())
1934 .cloned()
1935 .expect("access list exists");
1936 Some(sealed_access_list(access_list))
1937 })
1938 .collect::<Vec<_>>()
1939 };
1940 assert_eq!(range_access_lists(&blocks), expected);
1941 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1942 }
1943
1944 #[tokio::test]
1945 async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1946 let client = FullBlockWithAccessListsClient::default();
1947 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1948 client.access_lists.lock().remove(&header.hash());
1949
1950 let access_lists = Arc::clone(&client.access_lists);
1951 let bad_messages = Arc::clone(&client.bad_messages);
1952 let client = FullBlockClient::test_client(client);
1953
1954 let blocks = timeout(
1955 Duration::from_secs(1),
1956 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1957 )
1958 .await
1959 .expect("range request should complete");
1960
1961 assert_eq!(blocks.len(), 3);
1962 let expected = {
1963 let access_lists = access_lists.lock();
1964 blocks
1965 .iter()
1966 .map(|block| {
1967 if block.block().hash() == header.hash() {
1968 return None
1969 }
1970
1971 let access_list = access_lists
1972 .get(&block.block().hash())
1973 .cloned()
1974 .expect("access list exists");
1975 Some(sealed_access_list(access_list))
1976 })
1977 .collect::<Vec<_>>()
1978 };
1979 assert_eq!(range_access_lists(&blocks), expected);
1980 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1981 }
1982
1983 #[tokio::test]
1984 async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1985 let client = FullBlockWithAccessListsClient::default();
1986 client.set_access_lists_unsupported(true);
1987 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1988
1989 let request_count = Arc::clone(&client.access_list_requests);
1990 let client = FullBlockClient::test_client(client);
1991
1992 let blocks = timeout(
1993 Duration::from_secs(1),
1994 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1995 )
1996 .await
1997 .expect("range request should complete without access lists");
1998
1999 assert_eq!(blocks.len(), 3);
2000 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2001 assert_eq!(request_count.load(Ordering::SeqCst), 1);
2002 }
2003
2004 #[tokio::test]
2005 async fn download_full_block_range_with_access_lists_ignores_long_response() {
2006 let client = FullBlockWithAccessListsClient::default();
2007 client.set_extra_access_list_entries(1);
2008 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2009
2010 let request_count = Arc::clone(&client.access_list_requests);
2011 let bad_messages = Arc::clone(&client.bad_messages);
2012 let client = FullBlockClient::test_client(client);
2013
2014 let blocks = timeout(
2015 Duration::from_secs(1),
2016 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2017 )
2018 .await
2019 .expect("range request should complete without access lists");
2020
2021 assert_eq!(blocks.len(), 3);
2022 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2023 assert_eq!(request_count.load(Ordering::SeqCst), 1);
2024 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
2025 }
2026
2027 #[tokio::test]
2028 async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
2029 let client = FullBlockWithAccessListsClient::default();
2030 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2031 client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
2032
2033 let bad_messages = Arc::clone(&client.bad_messages);
2034 let client = FullBlockClient::test_client(client);
2035
2036 let blocks = timeout(
2037 Duration::from_secs(1),
2038 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2039 )
2040 .await
2041 .expect("range request should complete without access lists");
2042
2043 assert_eq!(blocks.len(), 3);
2044 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2045 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2046 }
2047
2048 #[tokio::test]
2049 async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
2050 let client = FullBlockWithAccessListsClient::default();
2051 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2052 let first_access_list =
2053 client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
2054 let second_hash = header.parent_hash;
2055 client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
2056
2057 let bad_messages = Arc::clone(&client.bad_messages);
2058 let client = FullBlockClient::test_client(client);
2059
2060 let blocks = timeout(
2061 Duration::from_secs(1),
2062 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2063 )
2064 .await
2065 .expect("range request should complete without unvalidated access lists");
2066
2067 assert_eq!(blocks.len(), 3);
2068 assert_eq!(blocks[1].block().hash(), second_hash);
2069 assert_eq!(
2070 range_access_lists(&blocks),
2071 vec![Some(sealed_access_list(first_access_list)), None, None]
2072 );
2073 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2074 }
2075
2076 #[tokio::test]
2077 async fn download_full_block_range_with_invalid_header() {
2078 let client = TestFullBlockClient::default();
2079 let range_length: usize = 3;
2080 let (header, _) = insert_headers_into_client(&client, 0..range_length);
2081
2082 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2083 test_consensus.set_fail_validation(true);
2084 test_consensus.set_fail_body_against_header(false);
2085 let client = FullBlockClient::new(client, Arc::new(test_consensus));
2086
2087 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2088
2089 assert_eq!(received.len(), range_length);
2090 for (i, block) in received.iter().enumerate() {
2091 let expected_number = header.number - i as u64;
2092 assert_eq!(block.number, expected_number);
2093 }
2094 }
2095}