1use super::headers::client::HeadersRequest;
2use crate::{
3 block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4 bodies::client::{BodiesClient, SingleBodyRequest},
5 download::DownloadClient,
6 error::PeerRequestResult,
7 headers::client::{HeadersClient, SingleHeaderRequest},
8 priority::Priority,
9 BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_primitives::{keccak256, Bytes, Sealable, Sealed, B256};
13use core::marker::PhantomData;
14use futures::FutureExt;
15use reth_consensus::Consensus;
16use reth_eth_wire_types::{
17 BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
18};
19use reth_network_peers::{PeerId, WithPeerId};
20use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
21use std::{
22 cmp::Reverse,
23 collections::{HashMap, VecDeque},
24 fmt::Debug,
25 hash::Hash,
26 ops::RangeInclusive,
27 pin::Pin,
28 sync::Arc,
29 task::{ready, Context, Poll},
30};
31use tracing::{debug, trace};
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct SealedBlockWith<B: Block, T = Option<Sealed<Bytes>>> {
36 block: SealedBlock<B>,
37 data: T,
38}
39
40pub type SealedBlockAccessList = Sealed<Bytes>;
42
43pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<SealedBlockAccessList>>;
45
46impl<B: Block, T> SealedBlockWith<B, T> {
47 pub const fn new(block: SealedBlock<B>, data: T) -> Self {
49 Self { block, data }
50 }
51
52 pub const fn block(&self) -> &SealedBlock<B> {
54 &self.block
55 }
56
57 pub const fn data(&self) -> &T {
59 &self.data
60 }
61
62 pub fn into_parts(self) -> (SealedBlock<B>, T) {
64 (self.block, self.data)
65 }
66}
67
68impl<B: Block> SealedBlockWithAccessList<B> {
69 pub const fn from_block(block: SealedBlock<B>) -> Self {
71 Self::new(block, None)
72 }
73
74 pub const fn access_list(&self) -> Option<&SealedBlockAccessList> {
76 self.data.as_ref()
77 }
78
79 pub const fn access_lists(&self) -> Option<&SealedBlockAccessList> {
81 self.data.as_ref()
82 }
83}
84
85impl<B: Block> From<SealedBlock<B>> for SealedBlockWith<B> {
86 fn from(block: SealedBlock<B>) -> Self {
87 Self::new(block, None)
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct FullBlockClient<Client>
94where
95 Client: BlockClient,
96{
97 client: Client,
98 consensus: Arc<dyn Consensus<Client::Block>>,
99}
100
101impl<Client> FullBlockClient<Client>
102where
103 Client: BlockClient,
104{
105 pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
107 Self { client, consensus }
108 }
109
110 #[cfg(any(test, feature = "test-utils"))]
112 pub fn test_client(client: Client) -> Self {
113 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
114 }
115}
116
117impl<Client> FullBlockClient<Client>
118where
119 Client: BlockClient,
120{
121 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
128 FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
129 }
130
131 pub fn get_full_block_range(
141 &self,
142 hash: B256,
143 count: u64,
144 ) -> FetchFullBlockRangeFuture<Client> {
145 let client = self.client.clone();
146 FetchFullBlockRangeFuture {
147 start_hash: hash,
148 count,
149 request: FullBlockRangeRequest {
150 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
151 bodies: None,
152 },
153 client,
154 headers: None,
155 pending_headers: VecDeque::new(),
156 bodies: HashMap::default(),
157 consensus: Arc::clone(&self.consensus),
158 }
159 }
160}
161
162impl<Client> FullBlockClient<Client>
163where
164 Client: BlockClient + BlockAccessListsClient,
165{
166 pub fn get_full_block_with_access_lists(
174 &self,
175 hash: B256,
176 ) -> FetchFullBlockWithBalFuture<Client> {
177 self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
178 }
179
180 pub fn get_full_block_with_access_lists_with_requirement(
188 &self,
189 hash: B256,
190 requirement: BalRequirement,
191 ) -> FetchFullBlockWithBalFuture<Client> {
192 let client = self.client.clone();
193 FetchFullBlockWithBalFuture {
194 block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
195 block_result: None,
196 bal_request_state: BalRequestState::Pending(
197 client.get_block_access_lists_with_requirement(vec![hash], requirement),
198 ),
199 }
200 }
201
202 pub fn get_full_block_range_with_optional_access_lists(
209 &self,
210 hash: B256,
211 count: u64,
212 ) -> FetchFullBlockRangeWithBalFuture<Client> {
213 self.get_full_block_range_with_optional_access_lists_with_requirement(
214 hash,
215 count,
216 BalRequirement::default(),
217 )
218 }
219
220 pub fn get_full_block_range_with_optional_access_lists_with_requirement(
227 &self,
228 hash: B256,
229 count: u64,
230 requirement: BalRequirement,
231 ) -> FetchFullBlockRangeWithBalFuture<Client> {
232 let client = self.client.clone();
233 FetchFullBlockRangeWithBalFuture {
234 blocks: self.get_full_block_range(hash, count),
235 client,
236 block_result: None,
237 access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
238 }
239 }
240}
241
242#[must_use = "futures do nothing unless polled"]
247pub struct FetchFullBlockFuture<Client>
248where
249 Client: BlockClient,
250{
251 client: Client,
252 consensus: Arc<dyn Consensus<Client::Block>>,
253 hash: B256,
254 request: FullBlockRequest<Client>,
255 header: Option<SealedHeader<Client::Header>>,
256 body: Option<BodyResponse<Client::Body>>,
257}
258
259impl<Client> FetchFullBlockFuture<Client>
260where
261 Client: BlockClient,
262{
263 fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
264 Self {
265 hash,
266 consensus,
267 request: FullBlockRequest {
268 header: Some(client.get_header(hash.into())),
269 body: Some(client.get_block_body(hash)),
270 },
271 client,
272 header: None,
273 body: None,
274 }
275 }
276
277 pub const fn hash(&self) -> &B256 {
279 &self.hash
280 }
281
282 pub fn block_number(&self) -> Option<u64> {
284 self.header.as_ref().map(|h| h.number())
285 }
286
287 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
289 if self.header.is_none() || self.body.is_none() {
290 return None
291 }
292
293 let header = self.header.take().unwrap();
294 let resp = self.body.take().unwrap();
295 match resp {
296 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
297 BodyResponse::PendingValidation(resp) => {
298 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
300 {
301 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
302 self.client.report_bad_message(resp.peer_id());
303 self.header = Some(header);
304 self.request.body = Some(self.client.get_block_body(self.hash));
305 return None
306 }
307 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
308 }
309 }
310 }
311
312 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
313 if let Some(ref header) = self.header {
314 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
315 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
316 self.client.report_bad_message(resp.peer_id());
317 return
318 }
319 self.body = Some(BodyResponse::Validated(resp.into_data()));
320 return
321 }
322 self.body = Some(BodyResponse::PendingValidation(resp));
323 }
324}
325
326impl<Client> Future for FetchFullBlockFuture<Client>
327where
328 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
329{
330 type Output = SealedBlock<Client::Block>;
331
332 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
333 let this = self.get_mut();
334
335 let mut budget = 4;
337
338 loop {
339 match ready!(this.request.poll(cx)) {
340 ResponseResult::Header(res) => {
341 match res {
342 Ok(maybe_header) => {
343 let (peer, maybe_header) =
344 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
345 if let Some(header) = maybe_header {
346 if header.hash() == this.hash {
347 this.header = Some(header);
348 } else {
349 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
350 this.client.report_bad_message(peer)
352 }
353 }
354 }
355 Err(err) => {
356 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
357 }
358 }
359
360 if this.header.is_none() {
361 this.request.header = Some(this.client.get_header(this.hash.into()));
363 }
364 }
365 ResponseResult::Body(res) => {
366 match res {
367 Ok(maybe_body) => {
368 if let Some(body) = maybe_body.transpose() {
369 this.on_block_response(body);
370 }
371 }
372 Err(err) => {
373 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
374 }
375 }
376 if this.body.is_none() {
377 this.request.body = Some(this.client.get_block_body(this.hash));
379 }
380 }
381 }
382
383 if let Some(res) = this.take_block() {
384 return Poll::Ready(res)
385 }
386
387 budget -= 1;
389 if budget == 0 {
390 cx.waker().wake_by_ref();
392 return Poll::Pending
393 }
394 }
395 }
396}
397
398#[must_use = "futures do nothing unless polled"]
403pub struct FetchFullBlockWithBalFuture<Client>
404where
405 Client: BlockClient + BlockAccessListsClient,
406{
407 block: FetchFullBlockFuture<Client>,
408 block_result: Option<SealedBlock<Client::Block>>,
409 bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
410}
411
412impl<Client> FetchFullBlockWithBalFuture<Client>
413where
414 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
415{
416 pub const fn hash(&self) -> &B256 {
418 self.block.hash()
419 }
420}
421
422impl<Client> FetchFullBlockWithBalFuture<Client>
423where
424 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
425{
426 pub fn block_number(&self) -> Option<u64> {
428 self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
429 }
430
431 fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
437 let res = match &mut self.bal_request_state {
438 BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
439 BalRequestState::Ready(_) => return Poll::Ready(()),
440 };
441
442 match res {
443 Ok(bal) => {
444 let (peer, access_lists) = bal.split();
445 match access_lists.0.len() {
446 0 => self.bal_request_state = BalRequestState::Ready(None),
447 1 => {
448 let access_list = access_lists.0.into_iter().next().expect("len checked");
449 self.bal_request_state =
450 BalRequestState::Ready(Some(WithPeerId::new(peer, access_list)));
451 }
452 received => {
453 debug!(
454 target: "downloaders",
455 hash = ?self.block.hash(),
456 expected = 1,
457 received,
458 "Received wrong access list response",
459 );
460 self.block.client.report_bad_message(peer);
461 self.bal_request_state = BalRequestState::Ready(None);
462 }
463 }
464 }
465 Err(err) => {
466 debug!(
467 target: "downloaders",
468 %err,
469 hash = ?self.block.hash(),
470 "Access list download failed",
471 );
472 self.bal_request_state = BalRequestState::Ready(None);
473 }
474 }
475
476 Poll::Ready(())
477 }
478
479 fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
485 let BalRequestState::Ready(access_list) = &mut self.bal_request_state else { return None };
486 let block = self.block_result.take()?;
487 let access_list = access_list.take().and_then(|access_list| {
488 match seal_block_access_list_for_block(&block, access_list) {
489 Ok(access_list) => access_list,
490 Err(peer) => {
491 self.block.client.report_bad_message(peer);
492 None
493 }
494 }
495 });
496 Some(SealedBlockWith::new(block, access_list))
497 }
498}
499
500impl<Client> Future for FetchFullBlockWithBalFuture<Client>
501where
502 Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
503{
504 type Output = SealedBlockWithAccessList<Client::Block>;
505
506 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
507 let this = self.get_mut();
508
509 if this.block_result.is_none() &&
510 let Poll::Ready(block) = this.block.poll_unpin(cx)
511 {
512 this.block_result = Some(block);
513 }
514
515 ready!(this.poll_bal_request(cx));
516
517 if let Some(res) = this.take_block_and_access_lists() {
518 return Poll::Ready(res)
519 }
520
521 Poll::Pending
522 }
523}
524
525impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
526where
527 Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
528{
529 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
530 f.debug_struct("FetchFullBlockWithBalFuture")
531 .field("hash", &self.block.hash())
532 .field("block_ready", &self.block_result.is_some())
533 .field("bal_request_ready", &self.bal_request_state.is_ready())
534 .finish()
535 }
536}
537
538enum BalRequestState<Req> {
540 Pending(Req),
541 Ready(Option<WithPeerId<Option<Bytes>>>),
542}
543
544impl<Req> BalRequestState<Req> {
545 const fn is_ready(&self) -> bool {
546 matches!(self, Self::Ready(_))
547 }
548}
549
550#[must_use = "futures do nothing unless polled"]
557#[expect(missing_debug_implementations)]
558pub struct FetchFullBlockRangeWithBalFuture<Client>
559where
560 Client: BlockClient + BlockAccessListsClient,
561{
562 blocks: FetchFullBlockRangeFuture<Client>,
563 client: Client,
564 block_result: Option<Vec<SealedBlock<Client::Block>>>,
565 access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
566}
567
568impl<Client> FetchFullBlockRangeWithBalFuture<Client>
569where
570 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
571 + BlockAccessListsClient,
572{
573 fn start_access_lists_request_if_possible(&mut self) {
574 let requirement = match &self.access_lists {
575 OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
576 OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
577 return
578 }
579 };
580
581 let Some(blocks) = self.block_result.as_ref() else { return };
583 let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
584 self.access_lists = OptionalBlockAccessListsState::Pending(
585 self.client.get_block_access_lists_with_requirement(hashes, requirement),
586 );
587 }
588
589 fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
591 self.start_access_lists_request_if_possible();
592
593 let poll = match &mut self.access_lists {
594 OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
595 OptionalBlockAccessListsState::WaitingForBlocks { .. } |
596 OptionalBlockAccessListsState::Ready(_) => return,
597 };
598
599 match poll {
600 Poll::Pending => {}
601 Poll::Ready(Ok(access_lists)) => {
602 self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
603 }
604 Poll::Ready(Err(err)) => {
605 debug!(
606 target: "downloaders",
607 %err,
608 start_hash = ?self.blocks.start_hash(),
609 "Access list range download failed",
610 );
611
612 self.access_lists = OptionalBlockAccessListsState::Ready(None);
615 }
616 }
617 }
618
619 fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
621 let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
622 return None
623 };
624
625 let blocks = self.block_result.take()?;
626
627 Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
628 }
629}
630
631impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
632where
633 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
634 + BlockAccessListsClient
635 + 'static,
636{
637 type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
638
639 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
640 let this = self.get_mut();
641
642 if this.block_result.is_none() &&
644 let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
645 {
646 this.block_result = Some(blocks);
647 }
648
649 this.poll_access_lists(cx);
650
651 if let Some(response) = this.take_response() {
652 return Poll::Ready(response)
653 }
654
655 Poll::Pending
656 }
657}
658
659enum OptionalBlockAccessListsState<Req> {
661 WaitingForBlocks {
663 requirement: BalRequirement,
665 },
666 Pending(Req),
668 Ready(Option<WithPeerId<BlockAccessLists>>),
670}
671
672impl<Client> Debug for FetchFullBlockFuture<Client>
673where
674 Client: BlockClient<Header: Debug, Body: Debug>,
675{
676 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677 f.debug_struct("FetchFullBlockFuture")
678 .field("hash", &self.hash)
679 .field("header", &self.header)
680 .field("body", &self.body)
681 .finish()
682 }
683}
684
685struct FullBlockRequest<Client>
686where
687 Client: BlockClient,
688{
689 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
690 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
691}
692
693impl<Client> FullBlockRequest<Client>
694where
695 Client: BlockClient,
696{
697 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
698 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
699 let Poll::Ready(res) = fut.poll(cx)
700 {
701 self.header = None;
702 return Poll::Ready(ResponseResult::Header(res))
703 }
704
705 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
706 let Poll::Ready(res) = fut.poll(cx)
707 {
708 self.body = None;
709 return Poll::Ready(ResponseResult::Body(res))
710 }
711
712 Poll::Pending
713 }
714}
715
716enum ResponseResult<H, B> {
719 Header(PeerRequestResult<Option<H>>),
720 Body(PeerRequestResult<Option<B>>),
721}
722
723#[derive(Debug)]
725enum BodyResponse<B> {
726 Validated(B),
728 PendingValidation(WithPeerId<B>),
730}
731#[must_use = "futures do nothing unless polled"]
744#[expect(missing_debug_implementations)]
745pub struct FetchFullBlockRangeFuture<Client>
746where
747 Client: BlockClient,
748{
749 client: Client,
751 consensus: Arc<dyn Consensus<Client::Block>>,
753 start_hash: B256,
755 count: u64,
757 request: FullBlockRangeRequest<Client>,
759 headers: Option<Vec<SealedHeader<Client::Header>>>,
761 pending_headers: VecDeque<SealedHeader<Client::Header>>,
763 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
765}
766
767impl<Client> FetchFullBlockRangeFuture<Client>
768where
769 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
770{
771 fn is_bodies_complete(&self) -> bool {
773 self.bodies.len() == self.count as usize
774 }
775
776 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
780 if let Some(header) = self.pending_headers.pop_front() {
781 self.bodies.insert(header, body_response);
782 }
783 }
784
785 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
787 for body in bodies {
788 self.insert_body(body);
789 }
790 }
791
792 fn remaining_bodies_hashes(&self) -> Vec<B256> {
795 self.pending_headers.iter().map(|h| h.hash()).collect()
796 }
797
798 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
806 if !self.is_bodies_complete() {
807 return None
809 }
810
811 let headers = self.headers.take()?;
812 let mut needs_retry = false;
813 let mut valid_responses = Vec::new();
814
815 for header in &headers {
816 if let Some(body_resp) = self.bodies.remove(header) {
817 let body = match body_resp {
819 BodyResponse::Validated(body) => body,
820 BodyResponse::PendingValidation(resp) => {
821 if let Err(err) =
823 self.consensus.validate_body_against_header(resp.data(), header)
824 {
825 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
826 self.client.report_bad_message(resp.peer_id());
827
828 self.pending_headers.push_back(header.clone());
830 needs_retry = true;
831 continue
832 }
833
834 resp.into_data()
835 }
836 };
837
838 valid_responses
839 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
840 }
841 }
842
843 if needs_retry {
844 for block in valid_responses {
847 let (header, body) = block.split_sealed_header_body();
848 self.bodies.insert(header, BodyResponse::Validated(body));
849 }
850
851 self.headers = Some(headers);
853
854 let hashes = self.remaining_bodies_hashes();
856 self.request.bodies = Some(self.client.get_block_bodies(hashes));
857 return None
858 }
859
860 Some(valid_responses)
861 }
862
863 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
864 let (peer, mut headers_falling) =
865 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
866
867 if headers_falling.len() == self.count as usize {
869 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
871
872 if headers_falling[0].hash() == self.start_hash {
874 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
875 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
877 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
878 self.client.report_bad_message(peer);
879 }
880
881 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
883
884 self.pending_headers = headers_falling.clone().into();
886
887 if !self.has_bodies_request_started() {
889 self.request.bodies = Some(self.client.get_block_bodies(hashes));
891 }
892
893 self.headers = Some(headers_falling);
895 } else {
896 self.client.report_bad_message(peer);
898 }
899 }
900 }
901
902 const fn has_bodies_request_started(&self) -> bool {
905 self.request.bodies.is_some()
906 }
907
908 pub const fn start_hash(&self) -> B256 {
910 self.start_hash
911 }
912
913 pub const fn count(&self) -> u64 {
915 self.count
916 }
917}
918
919impl<Client> Future for FetchFullBlockRangeFuture<Client>
920where
921 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
922{
923 type Output = Vec<SealedBlock<Client::Block>>;
924
925 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
926 let this = self.get_mut();
927
928 loop {
929 match ready!(this.request.poll(cx)) {
930 RangeResponseResult::Header(res) => {
939 match res {
940 Ok(headers) => {
941 this.on_headers_response(headers);
942 }
943 Err(err) => {
944 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
945 }
946 }
947
948 if this.headers.is_none() {
949 this.request.headers = Some(this.client.get_headers(HeadersRequest {
951 start: this.start_hash.into(),
952 limit: this.count,
953 direction: HeadersDirection::Falling,
954 }));
955 }
956 }
957 RangeResponseResult::Body(res) => {
963 match res {
964 Ok(bodies_resp) => {
965 let (peer, new_bodies) = bodies_resp.split();
966
967 this.insert_bodies(
969 new_bodies
970 .into_iter()
971 .map(|resp| WithPeerId::new(peer, resp))
972 .map(BodyResponse::PendingValidation),
973 );
974
975 if !this.is_bodies_complete() {
976 let req_hashes = this.remaining_bodies_hashes();
978
979 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
981 }
982 }
983 Err(err) => {
984 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
985 }
986 }
987 if this.request.bodies.is_none() && !this.is_bodies_complete() {
988 let hashes = this.remaining_bodies_hashes();
1001 if !hashes.is_empty() {
1002 this.request.bodies = Some(this.client.get_block_bodies(hashes));
1003 }
1004 }
1005 }
1006 }
1007
1008 if let Some(res) = this.take_blocks() {
1009 return Poll::Ready(res)
1010 }
1011 }
1012 }
1013}
1014
1015struct FullBlockRangeRequest<Client>
1019where
1020 Client: BlockClient,
1021{
1022 headers: Option<<Client as HeadersClient>::Output>,
1023 bodies: Option<<Client as BodiesClient>::Output>,
1024}
1025
1026impl<Client> FullBlockRangeRequest<Client>
1027where
1028 Client: BlockClient,
1029{
1030 fn poll(
1031 &mut self,
1032 cx: &mut Context<'_>,
1033 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
1034 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
1035 let Poll::Ready(res) = fut.poll(cx)
1036 {
1037 self.headers = None;
1038 return Poll::Ready(RangeResponseResult::Header(res))
1039 }
1040
1041 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
1042 let Poll::Ready(res) = fut.poll(cx)
1043 {
1044 self.bodies = None;
1045 return Poll::Ready(RangeResponseResult::Body(res))
1046 }
1047
1048 Poll::Pending
1049 }
1050}
1051
1052enum RangeResponseResult<H, B> {
1055 Header(PeerRequestResult<Vec<H>>),
1056 Body(PeerRequestResult<Vec<B>>),
1057}
1058
1059#[derive(Debug, Clone)]
1061#[non_exhaustive]
1062pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1063
1064impl<Net> DownloadClient for NoopFullBlockClient<Net>
1066where
1067 Net: Debug + Send + Sync,
1068{
1069 fn report_bad_message(&self, _peer_id: PeerId) {}
1076
1077 fn num_connected_peers(&self) -> usize {
1083 0
1084 }
1085}
1086
1087impl<Net> BodiesClient for NoopFullBlockClient<Net>
1089where
1090 Net: NetworkPrimitives,
1091{
1092 type Body = Net::BlockBody;
1093 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1095
1096 fn get_block_bodies_with_priority_and_range_hint(
1107 &self,
1108 _hashes: Vec<B256>,
1109 _priority: Priority,
1110 _range_hint: Option<RangeInclusive<u64>>,
1111 ) -> Self::Output {
1112 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1115 }
1116}
1117
1118impl<Net> HeadersClient for NoopFullBlockClient<Net>
1119where
1120 Net: NetworkPrimitives,
1121{
1122 type Header = Net::BlockHeader;
1123 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1126
1127 fn get_headers_with_priority(
1141 &self,
1142 _request: HeadersRequest,
1143 _priority: Priority,
1144 ) -> Self::Output {
1145 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1146 }
1147}
1148
1149impl<Net> BlockClient for NoopFullBlockClient<Net>
1150where
1151 Net: NetworkPrimitives,
1152{
1153 type Block = Net::Block;
1154}
1155
1156impl<Net> Default for NoopFullBlockClient<Net> {
1157 fn default() -> Self {
1158 Self(PhantomData::<Net>)
1159 }
1160}
1161
1162fn seal_block_access_list_for_block<B: Block>(
1168 block: &SealedBlock<B>,
1169 access_list: WithPeerId<Option<Bytes>>,
1170) -> Result<Option<Sealed<Bytes>>, PeerId> {
1171 let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1172
1173 let (peer, access_list) = access_list.split();
1174 let Some(access_list) = access_list else { return Ok(None) };
1175 let computed = keccak256(access_list.as_ref());
1176 if computed == expected {
1177 return Ok(Some(Sealed::new_unchecked(access_list, expected)))
1178 }
1179
1180 debug!(
1181 target: "downloaders",
1182 block_hash = ?block.hash(),
1183 ?computed,
1184 ?expected,
1185 "Received block access list with wrong hash",
1186 );
1187 Err(peer)
1188}
1189
1190fn seal_blocks_with_access_lists<Client>(
1197 client: &Client,
1198 blocks: Vec<SealedBlock<Client::Block>>,
1199 access_lists: Option<WithPeerId<BlockAccessLists>>,
1200) -> Vec<SealedBlockWithAccessList<Client::Block>>
1201where
1202 Client: BlockClient,
1203{
1204 let Some(access_lists) = access_lists else {
1205 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1206 };
1207
1208 let (peer, access_lists) = access_lists.split();
1209 let expected = blocks.len();
1210 let received = access_lists.0.len();
1211
1212 if received > expected {
1213 trace!(
1214 target: "downloaders",
1215 expected,
1216 received,
1217 "Ignoring overlong access list range response",
1218 );
1219 return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1220 }
1221
1222 let mut access_lists = access_lists.0.into_iter();
1223 let mut blocks = blocks.into_iter();
1224 let mut response = Vec::with_capacity(expected);
1225
1226 for block in blocks.by_ref() {
1227 let Some(access_list) = access_lists.next() else {
1228 response.push(SealedBlockWith::from_block(block));
1231 break
1232 };
1233
1234 match seal_block_access_list_for_block(&block, WithPeerId::new(peer, access_list)) {
1235 Ok(access_list) => response.push(SealedBlockWith::new(block, access_list)),
1236 Err(peer) => {
1237 client.report_bad_message(peer);
1240 response.push(SealedBlockWith::from_block(block));
1241 break
1242 }
1243 }
1244 }
1245
1246 response.extend(blocks.map(SealedBlockWith::from_block));
1247 response
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252 use reth_ethereum_primitives::BlockBody;
1253
1254 use super::*;
1255 use crate::{error::RequestError, test_utils::TestFullBlockClient};
1256 use alloy_consensus::Header;
1257 use alloy_primitives::{keccak256, Bytes};
1258 use parking_lot::Mutex;
1259 use std::{
1260 collections::HashMap,
1261 ops::Range,
1262 sync::{
1263 atomic::{AtomicBool, AtomicUsize, Ordering},
1264 Arc,
1265 },
1266 };
1267
1268 const EMPTY_LIST_CODE: u8 = 0xc0;
1269 use tokio::time::{timeout, Duration};
1270
1271 fn sealed_access_list(access_list: Bytes) -> Sealed<Bytes> {
1272 let hash = keccak256(access_list.as_ref());
1273 Sealed::new_unchecked(access_list, hash)
1274 }
1275
1276 fn sealed_header_with_access_list_hash(access_list: &Bytes) -> SealedHeader {
1277 let header = Header {
1278 block_access_list_hash: Some(keccak256(access_list.as_ref())),
1279 ..Default::default()
1280 };
1281 SealedHeader::seal_slow(header)
1282 }
1283
1284 fn range_access_lists<B: Block>(
1285 blocks: &[SealedBlockWithAccessList<B>],
1286 ) -> Vec<Option<Sealed<Bytes>>> {
1287 blocks.iter().map(|block| block.access_list().cloned()).collect()
1288 }
1289
1290 #[tokio::test]
1291 async fn download_single_full_block() {
1292 let client = TestFullBlockClient::default();
1293 let header: SealedHeader = SealedHeader::default();
1294 let body = BlockBody::default();
1295 client.insert(header.clone(), body.clone());
1296 let client = FullBlockClient::test_client(client);
1297
1298 let received = client.get_full_block(header.hash()).await;
1299 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1300 }
1301
1302 #[tokio::test]
1303 async fn download_single_full_block_range() {
1304 let client = TestFullBlockClient::default();
1305 let header: SealedHeader = SealedHeader::default();
1306 let body = BlockBody::default();
1307 client.insert(header.clone(), body.clone());
1308 let client = FullBlockClient::test_client(client);
1309
1310 let received = client.get_full_block_range(header.hash(), 1).await;
1311 let received = received.first().expect("response should include a block");
1312 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1313 }
1314
1315 #[tokio::test]
1316 async fn download_single_full_block_with_access_lists() {
1317 let client = FullBlockWithAccessListsClient::default();
1318 let body = BlockBody::default();
1319 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1320 let header = sealed_header_with_access_list_hash(&access_list);
1321 client.insert(header.clone(), body.clone(), access_list.clone());
1322
1323 let request_count = Arc::clone(&client.access_list_requests);
1324 let client = FullBlockClient::test_client(client);
1325
1326 let received = client.get_full_block_with_access_lists(header.hash()).await;
1327 let expected_access_list = sealed_access_list(access_list);
1328
1329 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1330 assert_eq!(received.access_list(), Some(&expected_access_list));
1331 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1332 }
1333
1334 #[tokio::test]
1335 async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1336 let client = FullBlockWithAccessListsClient::default();
1337 let body = BlockBody::default();
1338 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1339 let header = sealed_header_with_access_list_hash(&access_list);
1340 client.insert(header.clone(), body.clone(), access_list.clone());
1341
1342 let requirement = Arc::clone(&client.last_access_list_requirement);
1343 let client = FullBlockClient::test_client(client);
1344
1345 let received = client
1346 .get_full_block_with_access_lists_with_requirement(
1347 header.hash(),
1348 BalRequirement::Mandatory,
1349 )
1350 .await;
1351
1352 let expected_access_list = sealed_access_list(access_list);
1353 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1354 assert_eq!(received.access_list(), Some(&expected_access_list));
1355 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1356 }
1357
1358 #[tokio::test]
1359 async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1360 let client = FullBlockWithAccessListsClient::default();
1361 client.set_access_list_pending_polls(1);
1362
1363 let body = BlockBody::default();
1364 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1365 let header = sealed_header_with_access_list_hash(&access_list);
1366 client.insert(header.clone(), body.clone(), access_list.clone());
1367
1368 let request_count = Arc::clone(&client.access_list_requests);
1369 let client = FullBlockClient::test_client(client);
1370
1371 let received =
1372 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1373 .await
1374 .expect("access list request should complete");
1375
1376 let expected_access_list = sealed_access_list(access_list);
1377 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1378 assert_eq!(received.access_list(), Some(&expected_access_list));
1379 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1380 }
1381
1382 #[tokio::test]
1383 async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1384 let client = FullBlockWithAccessListsClient::default();
1385 let body = BlockBody::default();
1386 let expected_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1387 let wrong_access_list = Bytes::from_static(&[0xc1, 0x01]);
1388 let header = sealed_header_with_access_list_hash(&expected_access_list);
1389 client.insert(header.clone(), body.clone(), wrong_access_list);
1390
1391 let bad_messages = Arc::clone(&client.bad_messages);
1392 let client = FullBlockClient::test_client(client);
1393
1394 let received =
1395 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1396 .await
1397 .expect("block request should complete without access lists");
1398
1399 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1400 assert!(received.access_list().is_none());
1401 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1402 }
1403
1404 #[tokio::test]
1405 async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1406 let client = FullBlockWithAccessListsClient::default();
1407 let body = BlockBody::default();
1408 let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1409 let header = sealed_header_with_access_list_hash(&expected_access_list);
1410 client.inner.insert(header.clone(), body.clone());
1411
1412 let bad_messages = Arc::clone(&client.bad_messages);
1413 let client = FullBlockClient::test_client(client);
1414
1415 let received =
1416 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1417 .await
1418 .expect("block request should complete without access lists");
1419
1420 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1421 assert!(received.access_list().is_none());
1422 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1423 }
1424
1425 #[tokio::test]
1426 async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1427 let client = FullBlockWithAccessListsClient::default();
1428 let body = BlockBody::default();
1429 let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1430 let wrong_empty_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1431 let header = sealed_header_with_access_list_hash(&expected_access_list);
1432 client.insert(header.clone(), body.clone(), wrong_empty_access_list);
1433
1434 let bad_messages = Arc::clone(&client.bad_messages);
1435 let client = FullBlockClient::test_client(client);
1436
1437 let received =
1438 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1439 .await
1440 .expect("block request should complete without access lists");
1441
1442 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1443 assert!(received.access_list().is_none());
1444 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1445 }
1446
1447 #[tokio::test]
1448 async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1449 let client = FullBlockWithAccessListsClient::default();
1450 client.empty_first_response.store(true, Ordering::SeqCst);
1451
1452 let body = BlockBody::default();
1453 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1454 let header = sealed_header_with_access_list_hash(&access_list);
1455 client.insert(header.clone(), body.clone(), access_list.clone());
1456
1457 let request_count = Arc::clone(&client.access_list_requests);
1458 let bad_messages = Arc::clone(&client.bad_messages);
1459 let client = FullBlockClient::test_client(client);
1460
1461 let received =
1462 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1463 .await
1464 .expect("block request should complete without access lists");
1465
1466 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1467 assert!(received.access_list().is_none());
1468 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1469 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1470 }
1471
1472 #[tokio::test]
1473 async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1474 let client = FullBlockWithAccessListsClient::default();
1475 client.set_access_lists_unsupported(true);
1476
1477 let body = BlockBody::default();
1478 let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1479 let header = sealed_header_with_access_list_hash(&access_list);
1480 client.insert(header.clone(), body.clone(), access_list);
1481
1482 let request_count = Arc::clone(&client.access_list_requests);
1483 let requirement = Arc::clone(&client.last_access_list_requirement);
1484 let client = FullBlockClient::test_client(client);
1485
1486 let received =
1487 timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1488 .await
1489 .expect("block request should complete without access lists");
1490
1491 assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1492 assert!(received.access_list().is_none());
1493 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1494 assert_eq!(
1495 *requirement.lock(),
1496 Some(BalRequirement::Optional),
1497 "single block BAL lookup should be best-effort"
1498 );
1499 }
1500
1501 fn insert_headers_into_client(
1503 client: &TestFullBlockClient,
1504 range: Range<usize>,
1505 ) -> (SealedHeader, BlockBody) {
1506 let mut sealed_header: SealedHeader = SealedHeader::default();
1507 let body = BlockBody::default();
1508 for _ in range {
1509 let (mut header, hash) = sealed_header.split();
1510 header.parent_hash = hash;
1512 header.number += 1;
1513
1514 sealed_header = SealedHeader::seal_slow(header);
1515
1516 client.insert(sealed_header.clone(), body.clone());
1517 }
1518
1519 (sealed_header, body)
1520 }
1521
1522 #[derive(Clone, Debug)]
1523 struct FullBlockWithAccessListsClient {
1524 inner: TestFullBlockClient,
1525 access_lists: Arc<Mutex<HashMap<B256, Bytes>>>,
1526 access_list_requests: Arc<AtomicUsize>,
1527 access_list_soft_limit: Arc<AtomicUsize>,
1528 access_list_pending_polls: Arc<AtomicUsize>,
1529 extra_access_list_entries: Arc<AtomicUsize>,
1530 unsupported_access_lists: Arc<AtomicBool>,
1531 last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1532 bad_messages: Arc<AtomicUsize>,
1533 empty_first_response: Arc<AtomicBool>,
1534 }
1535
1536 impl Default for FullBlockWithAccessListsClient {
1537 fn default() -> Self {
1538 Self {
1539 inner: TestFullBlockClient::default(),
1540 access_lists: Arc::new(Mutex::new(HashMap::default())),
1541 access_list_requests: Arc::new(AtomicUsize::new(0)),
1542 access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1543 access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1544 extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1545 unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1546 last_access_list_requirement: Arc::new(Mutex::new(None)),
1547 bad_messages: Arc::new(AtomicUsize::new(0)),
1548 empty_first_response: Arc::new(AtomicBool::new(false)),
1549 }
1550 }
1551 }
1552
1553 impl FullBlockWithAccessListsClient {
1554 fn insert(&self, header: SealedHeader, body: BlockBody, access_list: Bytes) {
1555 self.inner.insert(header.clone(), body);
1556 self.access_lists.lock().insert(header.hash(), access_list);
1557 }
1558
1559 fn set_access_list_soft_limit(&self, limit: usize) {
1560 self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1561 }
1562
1563 fn set_access_list_pending_polls(&self, polls: usize) {
1564 self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1565 }
1566
1567 fn set_extra_access_list_entries(&self, count: usize) {
1568 self.extra_access_list_entries.store(count, Ordering::SeqCst);
1569 }
1570
1571 fn set_access_lists_unsupported(&self, unsupported: bool) {
1572 self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1573 }
1574 }
1575
1576 fn insert_headers_with_access_lists_into_client(
1578 client: &FullBlockWithAccessListsClient,
1579 range: Range<usize>,
1580 ) -> (SealedHeader, BlockBody) {
1581 let mut sealed_header: SealedHeader = SealedHeader::default();
1582 let body = BlockBody::default();
1583 for block_idx in range {
1584 let (mut header, hash) = sealed_header.split();
1585 header.parent_hash = hash;
1586 header.number += 1;
1587 let access_list = Bytes::from(vec![0xc1, block_idx as u8]);
1588 header.block_access_list_hash = Some(keccak256(access_list.as_ref()));
1589
1590 sealed_header = SealedHeader::seal_slow(header);
1591
1592 client.insert(sealed_header.clone(), body.clone(), access_list);
1593 }
1594
1595 (sealed_header, body)
1596 }
1597
1598 impl DownloadClient for FullBlockWithAccessListsClient {
1599 fn report_bad_message(&self, peer_id: PeerId) {
1600 self.bad_messages.fetch_add(1, Ordering::SeqCst);
1601 self.inner.report_bad_message(peer_id);
1602 }
1603
1604 fn num_connected_peers(&self) -> usize {
1605 self.inner.num_connected_peers()
1606 }
1607 }
1608
1609 impl HeadersClient for FullBlockWithAccessListsClient {
1610 type Header = <TestFullBlockClient as HeadersClient>::Header;
1611 type Output = <TestFullBlockClient as HeadersClient>::Output;
1612
1613 fn get_headers_with_priority(
1614 &self,
1615 request: HeadersRequest,
1616 priority: Priority,
1617 ) -> Self::Output {
1618 self.inner.get_headers_with_priority(request, priority)
1619 }
1620 }
1621
1622 impl BodiesClient for FullBlockWithAccessListsClient {
1623 type Body = <TestFullBlockClient as BodiesClient>::Body;
1624 type Output = <TestFullBlockClient as BodiesClient>::Output;
1625
1626 fn get_block_bodies_with_priority_and_range_hint(
1627 &self,
1628 hashes: Vec<B256>,
1629 priority: Priority,
1630 range_hint: Option<RangeInclusive<u64>>,
1631 ) -> Self::Output {
1632 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1633 }
1634 }
1635
1636 struct MaybePendingAccessLists {
1637 response: Option<PeerRequestResult<BlockAccessLists>>,
1638 pending_polls: usize,
1639 }
1640
1641 impl MaybePendingAccessLists {
1642 const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1643 Self { response: Some(response), pending_polls }
1644 }
1645 }
1646
1647 impl std::future::Future for MaybePendingAccessLists {
1648 type Output = PeerRequestResult<BlockAccessLists>;
1649
1650 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1651 if self.pending_polls > 0 {
1652 self.pending_polls -= 1;
1653 cx.waker().wake_by_ref();
1654 return Poll::Pending
1655 }
1656
1657 Poll::Ready(self.response.take().expect("future polled after completion"))
1658 }
1659 }
1660
1661 impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1662 type Output = MaybePendingAccessLists;
1663
1664 fn get_block_access_lists_with_priority_and_requirement(
1665 &self,
1666 hashes: Vec<B256>,
1667 _priority: Priority,
1668 requirement: BalRequirement,
1669 ) -> Self::Output {
1670 self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1671 *self.last_access_list_requirement.lock() = Some(requirement);
1672 let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1673
1674 if self.unsupported_access_lists.load(Ordering::SeqCst) {
1675 return MaybePendingAccessLists::new(
1676 Err(RequestError::UnsupportedCapability),
1677 pending_polls,
1678 )
1679 }
1680
1681 if self.empty_first_response.swap(false, Ordering::SeqCst) {
1682 return MaybePendingAccessLists::new(
1683 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1684 pending_polls,
1685 )
1686 }
1687
1688 let mut access_lists: Vec<_> = hashes
1689 .into_iter()
1690 .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1691 .map(|hash| self.access_lists.lock().get(&hash).cloned())
1692 .collect();
1693 for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1694 access_lists.push(None);
1695 }
1696
1697 MaybePendingAccessLists::new(
1698 Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1699 pending_polls,
1700 )
1701 }
1702 }
1703
1704 impl BlockClient for FullBlockWithAccessListsClient {
1705 type Block = reth_ethereum_primitives::Block;
1706 }
1707
1708 #[derive(Clone, Debug)]
1709 struct FailingBodiesClient {
1710 inner: TestFullBlockClient,
1711 fail_on: usize,
1712 body_requests: Arc<AtomicUsize>,
1713 }
1714
1715 impl FailingBodiesClient {
1716 fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1717 Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1718 }
1719 }
1720
1721 impl DownloadClient for FailingBodiesClient {
1722 fn report_bad_message(&self, peer_id: PeerId) {
1723 self.inner.report_bad_message(peer_id);
1724 }
1725
1726 fn num_connected_peers(&self) -> usize {
1727 self.inner.num_connected_peers()
1728 }
1729 }
1730
1731 impl HeadersClient for FailingBodiesClient {
1732 type Header = <TestFullBlockClient as HeadersClient>::Header;
1733 type Output = <TestFullBlockClient as HeadersClient>::Output;
1734
1735 fn get_headers_with_priority(
1736 &self,
1737 request: HeadersRequest,
1738 priority: Priority,
1739 ) -> Self::Output {
1740 self.inner.get_headers_with_priority(request, priority)
1741 }
1742 }
1743
1744 impl BodiesClient for FailingBodiesClient {
1745 type Body = <TestFullBlockClient as BodiesClient>::Body;
1746 type Output = <TestFullBlockClient as BodiesClient>::Output;
1747
1748 fn get_block_bodies_with_priority_and_range_hint(
1749 &self,
1750 hashes: Vec<B256>,
1751 priority: Priority,
1752 range_hint: Option<RangeInclusive<u64>>,
1753 ) -> Self::Output {
1754 let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1755 if attempt == self.fail_on {
1756 return futures::future::ready(Err(RequestError::Timeout))
1757 }
1758
1759 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1760 }
1761 }
1762
1763 impl BlockClient for FailingBodiesClient {
1764 type Block = reth_ethereum_primitives::Block;
1765 }
1766
1767 #[tokio::test]
1768 async fn download_full_block_range() {
1769 let client = TestFullBlockClient::default();
1770 let (header, body) = insert_headers_into_client(&client, 0..50);
1771 let client = FullBlockClient::test_client(client);
1772
1773 let received = client.get_full_block_range(header.hash(), 1).await;
1774 let received = received.first().expect("response should include a block");
1775 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1776
1777 let received = client.get_full_block_range(header.hash(), 10).await;
1778 assert_eq!(received.len(), 10);
1779 for (i, block) in received.iter().enumerate() {
1780 let expected_number = header.number - i as u64;
1781 assert_eq!(block.number, expected_number);
1782 }
1783 }
1784
1785 #[tokio::test]
1786 async fn download_full_block_range_over_soft_limit() {
1787 let client = TestFullBlockClient::default();
1789 let (header, body) = insert_headers_into_client(&client, 0..50);
1790 let client = FullBlockClient::test_client(client);
1791
1792 let received = client.get_full_block_range(header.hash(), 1).await;
1793 let received = received.first().expect("response should include a block");
1794 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1795
1796 let received = client.get_full_block_range(header.hash(), 50).await;
1797 assert_eq!(received.len(), 50);
1798 for (i, block) in received.iter().enumerate() {
1799 let expected_number = header.number - i as u64;
1800 assert_eq!(block.number, expected_number);
1801 }
1802 }
1803
1804 #[tokio::test]
1805 async fn download_full_block_range_retries_after_body_error() {
1806 let mut client = TestFullBlockClient::default();
1807 client.set_soft_limit(2);
1808 let (header, _) = insert_headers_into_client(&client, 0..3);
1809
1810 let client = FailingBodiesClient::new(client, 1);
1811 let body_requests = Arc::clone(&client.body_requests);
1812 let client = FullBlockClient::test_client(client);
1813
1814 let received =
1815 timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1816 .await
1817 .expect("body request retry should complete");
1818
1819 assert_eq!(received.len(), 3);
1820 assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1821 }
1822
1823 #[tokio::test]
1824 async fn download_full_block_range_with_access_lists() {
1825 let client = FullBlockWithAccessListsClient::default();
1826 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1827
1828 let access_lists = Arc::clone(&client.access_lists);
1829 let request_count = Arc::clone(&client.access_list_requests);
1830 let requirement = Arc::clone(&client.last_access_list_requirement);
1831 let client = FullBlockClient::test_client(client);
1832
1833 let response = timeout(
1834 Duration::from_secs(1),
1835 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1836 )
1837 .await
1838 .expect("range request should complete");
1839
1840 let blocks = response;
1841 assert_eq!(blocks.len(), 3);
1842 let expected = {
1843 let access_lists = access_lists.lock();
1844 blocks
1845 .iter()
1846 .map(|block| {
1847 let access_list = access_lists
1848 .get(&block.block().hash())
1849 .cloned()
1850 .expect("access list exists");
1851 Some(sealed_access_list(access_list))
1852 })
1853 .collect::<Vec<_>>()
1854 };
1855 assert_eq!(range_access_lists(&blocks), expected);
1856 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1857 assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1858 }
1859
1860 #[tokio::test]
1861 async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1862 let client = FullBlockWithAccessListsClient::default();
1863 client.empty_first_response.store(true, Ordering::SeqCst);
1864 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1865
1866 let request_count = Arc::clone(&client.access_list_requests);
1867 let client = FullBlockClient::test_client(client);
1868
1869 let response = timeout(
1870 Duration::from_secs(1),
1871 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1872 )
1873 .await
1874 .expect("range request should complete without access lists");
1875
1876 let blocks = response;
1877 assert_eq!(blocks.len(), 3);
1878 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1879 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1880 }
1881
1882 #[tokio::test]
1883 async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1884 let client = FullBlockWithAccessListsClient::default();
1885 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1886
1887 let requirement = Arc::clone(&client.last_access_list_requirement);
1888 let client = FullBlockClient::test_client(client);
1889
1890 let blocks = timeout(
1891 Duration::from_secs(1),
1892 client.get_full_block_range_with_optional_access_lists_with_requirement(
1893 header.hash(),
1894 3,
1895 BalRequirement::Mandatory,
1896 ),
1897 )
1898 .await
1899 .expect("range request should complete");
1900
1901 assert_eq!(blocks.len(), 3);
1902 assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1903 }
1904
1905 #[tokio::test]
1906 async fn download_full_block_range_with_access_lists_preserves_short_response() {
1907 let client = FullBlockWithAccessListsClient::default();
1908 client.set_access_list_soft_limit(2);
1909 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1910
1911 let access_lists = Arc::clone(&client.access_lists);
1912 let request_count = Arc::clone(&client.access_list_requests);
1913 let client = FullBlockClient::test_client(client);
1914
1915 let blocks = timeout(
1916 Duration::from_secs(1),
1917 client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1918 )
1919 .await
1920 .expect("range request should complete without access lists");
1921
1922 assert_eq!(blocks.len(), 5);
1923 let expected = {
1924 let access_lists = access_lists.lock();
1925 blocks
1926 .iter()
1927 .enumerate()
1928 .map(|(idx, block)| {
1929 if idx >= 2 {
1930 return None
1931 }
1932
1933 let access_list = access_lists
1934 .get(&block.block().hash())
1935 .cloned()
1936 .expect("access list exists");
1937 Some(sealed_access_list(access_list))
1938 })
1939 .collect::<Vec<_>>()
1940 };
1941 assert_eq!(range_access_lists(&blocks), expected);
1942 assert_eq!(request_count.load(Ordering::SeqCst), 1);
1943 }
1944
1945 #[tokio::test]
1946 async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1947 let client = FullBlockWithAccessListsClient::default();
1948 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1949 client.access_lists.lock().remove(&header.hash());
1950
1951 let access_lists = Arc::clone(&client.access_lists);
1952 let bad_messages = Arc::clone(&client.bad_messages);
1953 let client = FullBlockClient::test_client(client);
1954
1955 let blocks = timeout(
1956 Duration::from_secs(1),
1957 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1958 )
1959 .await
1960 .expect("range request should complete");
1961
1962 assert_eq!(blocks.len(), 3);
1963 let expected = {
1964 let access_lists = access_lists.lock();
1965 blocks
1966 .iter()
1967 .map(|block| {
1968 if block.block().hash() == header.hash() {
1969 return None
1970 }
1971
1972 let access_list = access_lists
1973 .get(&block.block().hash())
1974 .cloned()
1975 .expect("access list exists");
1976 Some(sealed_access_list(access_list))
1977 })
1978 .collect::<Vec<_>>()
1979 };
1980 assert_eq!(range_access_lists(&blocks), expected);
1981 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1982 }
1983
1984 #[tokio::test]
1985 async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1986 let client = FullBlockWithAccessListsClient::default();
1987 client.set_access_lists_unsupported(true);
1988 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1989
1990 let request_count = Arc::clone(&client.access_list_requests);
1991 let client = FullBlockClient::test_client(client);
1992
1993 let blocks = timeout(
1994 Duration::from_secs(1),
1995 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1996 )
1997 .await
1998 .expect("range request should complete without access lists");
1999
2000 assert_eq!(blocks.len(), 3);
2001 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2002 assert_eq!(request_count.load(Ordering::SeqCst), 1);
2003 }
2004
2005 #[tokio::test]
2006 async fn download_full_block_range_with_access_lists_ignores_long_response() {
2007 let client = FullBlockWithAccessListsClient::default();
2008 client.set_extra_access_list_entries(1);
2009 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2010
2011 let request_count = Arc::clone(&client.access_list_requests);
2012 let bad_messages = Arc::clone(&client.bad_messages);
2013 let client = FullBlockClient::test_client(client);
2014
2015 let blocks = timeout(
2016 Duration::from_secs(1),
2017 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2018 )
2019 .await
2020 .expect("range request should complete without access lists");
2021
2022 assert_eq!(blocks.len(), 3);
2023 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2024 assert_eq!(request_count.load(Ordering::SeqCst), 1);
2025 assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
2026 }
2027
2028 #[tokio::test]
2029 async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
2030 let client = FullBlockWithAccessListsClient::default();
2031 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2032 client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
2033
2034 let bad_messages = Arc::clone(&client.bad_messages);
2035 let client = FullBlockClient::test_client(client);
2036
2037 let blocks = timeout(
2038 Duration::from_secs(1),
2039 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2040 )
2041 .await
2042 .expect("range request should complete without access lists");
2043
2044 assert_eq!(blocks.len(), 3);
2045 assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2046 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2047 }
2048
2049 #[tokio::test]
2050 async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
2051 let client = FullBlockWithAccessListsClient::default();
2052 let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2053 let first_access_list =
2054 client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
2055 let second_hash = header.parent_hash;
2056 client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
2057
2058 let bad_messages = Arc::clone(&client.bad_messages);
2059 let client = FullBlockClient::test_client(client);
2060
2061 let blocks = timeout(
2062 Duration::from_secs(1),
2063 client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2064 )
2065 .await
2066 .expect("range request should complete without unvalidated access lists");
2067
2068 assert_eq!(blocks.len(), 3);
2069 assert_eq!(blocks[1].block().hash(), second_hash);
2070 assert_eq!(
2071 range_access_lists(&blocks),
2072 vec![Some(sealed_access_list(first_access_list)), None, None]
2073 );
2074 assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2075 }
2076
2077 #[tokio::test]
2078 async fn download_full_block_range_with_invalid_header() {
2079 let client = TestFullBlockClient::default();
2080 let range_length: usize = 3;
2081 let (header, _) = insert_headers_into_client(&client, 0..range_length);
2082
2083 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2084 test_consensus.set_fail_validation(true);
2085 test_consensus.set_fail_body_against_header(false);
2086 let client = FullBlockClient::new(client, Arc::new(test_consensus));
2087
2088 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2089
2090 assert_eq!(received.len(), range_length);
2091 for (i, block) in received.iter().enumerate() {
2092 let expected_number = header.number - i as u64;
2093 assert_eq!(block.number, expected_number);
2094 }
2095 }
2096}