1use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
4use alloy_consensus::BlockHeader;
5use alloy_primitives::B256;
6use futures::FutureExt;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9 full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
10 BlockClient,
11};
12use reth_primitives_traits::{Block, RecoveredBlock, SealedBlock};
13use std::{
14 cmp::{Ordering, Reverse},
15 collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
16 fmt::Debug,
17 sync::Arc,
18 task::{Context, Poll},
19};
20use tracing::trace;
21
22pub trait BlockDownloader: Send + Sync {
24 type Block: Block;
26
27 fn on_action(&mut self, action: DownloadAction);
29
30 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<Self::Block>>;
32}
33
34#[derive(Debug)]
36pub enum DownloadAction {
37 Clear,
39 Download(DownloadRequest),
41}
42
43#[derive(Debug)]
45pub enum DownloadOutcome<B: Block> {
46 Blocks(Vec<RecoveredBlock<B>>),
48 NewDownloadStarted {
50 remaining_blocks: u64,
52 target: B256,
54 },
55}
56
57#[allow(missing_debug_implementations)]
59pub struct BasicBlockDownloader<Client, B: Block>
60where
61 Client: BlockClient + 'static,
62{
63 full_block_client: FullBlockClient<Client>,
65 inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
67 inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
69 set_buffered_blocks: BinaryHeap<Reverse<OrderedRecoveredBlock<B>>>,
72 metrics: BlockDownloaderMetrics,
74 pending_events: VecDeque<DownloadOutcome<B>>,
76}
77
78impl<Client, B> BasicBlockDownloader<Client, B>
79where
80 Client: BlockClient<Block = B> + 'static,
81 B: Block,
82{
83 pub fn new(client: Client, consensus: Arc<dyn Consensus<B, Error = ConsensusError>>) -> Self {
85 Self {
86 full_block_client: FullBlockClient::new(client, consensus),
87 inflight_full_block_requests: Vec::new(),
88 inflight_block_range_requests: Vec::new(),
89 set_buffered_blocks: BinaryHeap::new(),
90 metrics: BlockDownloaderMetrics::default(),
91 pending_events: Default::default(),
92 }
93 }
94
95 fn clear(&mut self) {
97 self.inflight_full_block_requests.clear();
98 self.inflight_block_range_requests.clear();
99 self.set_buffered_blocks.clear();
100 self.update_block_download_metrics();
101 }
102
103 fn download(&mut self, request: DownloadRequest) {
105 match request {
106 DownloadRequest::BlockSet(hashes) => self.download_block_set(hashes),
107 DownloadRequest::BlockRange(hash, count) => self.download_block_range(hash, count),
108 }
109 }
110
111 fn download_block_set(&mut self, hashes: HashSet<B256>) {
113 for hash in hashes {
114 self.download_full_block(hash);
115 }
116 }
117
118 fn download_block_range(&mut self, hash: B256, count: u64) {
120 if count == 1 {
121 self.download_full_block(hash);
122 } else {
123 trace!(
124 target: "consensus::engine",
125 ?hash,
126 ?count,
127 "start downloading full block range."
128 );
129
130 let request = self.full_block_client.get_full_block_range(hash, count);
131 self.push_pending_event(DownloadOutcome::NewDownloadStarted {
132 remaining_blocks: request.count(),
133 target: request.start_hash(),
134 });
135 self.inflight_block_range_requests.push(request);
136 }
137 }
138
139 fn download_full_block(&mut self, hash: B256) -> bool {
144 if self.is_inflight_request(hash) {
145 return false
146 }
147 self.push_pending_event(DownloadOutcome::NewDownloadStarted {
148 remaining_blocks: 1,
149 target: hash,
150 });
151
152 trace!(
153 target: "consensus::engine::sync",
154 ?hash,
155 "Start downloading full block"
156 );
157
158 let request = self.full_block_client.get_full_block(hash);
159 self.inflight_full_block_requests.push(request);
160
161 self.update_block_download_metrics();
162
163 true
164 }
165
166 fn is_inflight_request(&self, hash: B256) -> bool {
168 self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
169 }
170
171 fn update_block_download_metrics(&self) {
173 let blocks = self.inflight_full_block_requests.len() +
174 self.inflight_block_range_requests.iter().map(|r| r.count() as usize).sum::<usize>();
175 self.metrics.active_block_downloads.set(blocks as f64);
176 }
177
178 fn push_pending_event(&mut self, pending_event: DownloadOutcome<B>) {
180 self.pending_events.push_back(pending_event);
181 }
182
183 fn pop_pending_event(&mut self) -> Option<DownloadOutcome<B>> {
185 self.pending_events.pop_front()
186 }
187}
188
189impl<Client, B> BlockDownloader for BasicBlockDownloader<Client, B>
190where
191 Client: BlockClient<Block = B>,
192 B: Block,
193{
194 type Block = B;
195
196 fn on_action(&mut self, action: DownloadAction) {
198 match action {
199 DownloadAction::Clear => self.clear(),
200 DownloadAction::Download(request) => self.download(request),
201 }
202 }
203
204 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
206 if let Some(pending_event) = self.pop_pending_event() {
207 return Poll::Ready(pending_event);
208 }
209
210 for idx in (0..self.inflight_full_block_requests.len()).rev() {
212 let mut request = self.inflight_full_block_requests.swap_remove(idx);
213 if let Poll::Ready(block) = request.poll_unpin(cx) {
214 trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
215 self.set_buffered_blocks.push(Reverse(block.into()));
216 } else {
217 self.inflight_full_block_requests.push(request);
219 }
220 }
221
222 for idx in (0..self.inflight_block_range_requests.len()).rev() {
224 let mut request = self.inflight_block_range_requests.swap_remove(idx);
225 if let Poll::Ready(blocks) = request.poll_unpin(cx) {
226 trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
227 self.set_buffered_blocks.extend(
228 blocks
229 .into_iter()
230 .map(|b| {
231 let senders = b.senders().unwrap_or_default();
232 OrderedRecoveredBlock(RecoveredBlock::new_sealed(b, senders))
233 })
234 .map(Reverse),
235 );
236 } else {
237 self.inflight_block_range_requests.push(request);
239 }
240 }
241
242 self.update_block_download_metrics();
243
244 if self.set_buffered_blocks.is_empty() {
245 return Poll::Pending;
246 }
247
248 let mut downloaded_blocks: Vec<RecoveredBlock<B>> =
250 Vec::with_capacity(self.set_buffered_blocks.len());
251 while let Some(block) = self.set_buffered_blocks.pop() {
252 while let Some(peek) = self.set_buffered_blocks.peek_mut() {
254 if peek.0 .0.hash() == block.0 .0.hash() {
255 PeekMut::pop(peek);
256 } else {
257 break
258 }
259 }
260 downloaded_blocks.push(block.0.into());
261 }
262 Poll::Ready(DownloadOutcome::Blocks(downloaded_blocks))
263 }
264}
265
266#[derive(Debug, Clone, PartialEq, Eq)]
269struct OrderedRecoveredBlock<B: Block>(RecoveredBlock<B>);
270
271impl<B: Block> PartialOrd for OrderedRecoveredBlock<B> {
272 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
273 Some(self.cmp(other))
274 }
275}
276
277impl<B: Block> Ord for OrderedRecoveredBlock<B> {
278 fn cmp(&self, other: &Self) -> Ordering {
279 self.0.number().cmp(&other.0.number())
280 }
281}
282
283impl<B: Block> From<SealedBlock<B>> for OrderedRecoveredBlock<B> {
284 fn from(block: SealedBlock<B>) -> Self {
285 let senders = block.senders().unwrap_or_default();
286 Self(RecoveredBlock::new_sealed(block, senders))
287 }
288}
289
290impl<B: Block> From<OrderedRecoveredBlock<B>> for RecoveredBlock<B> {
291 fn from(value: OrderedRecoveredBlock<B>) -> Self {
292 value.0
293 }
294}
295
296#[derive(Debug, Clone, Default)]
298#[non_exhaustive]
299pub struct NoopBlockDownloader<B>(core::marker::PhantomData<B>);
300
301impl<B: Block> BlockDownloader for NoopBlockDownloader<B> {
302 type Block = B;
303
304 fn on_action(&mut self, _event: DownloadAction) {}
305
306 fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
307 Poll::Pending
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::test_utils::insert_headers_into_client;
315 use alloy_consensus::Header;
316 use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT_30M;
317 use assert_matches::assert_matches;
318 use reth_chainspec::{ChainSpecBuilder, MAINNET};
319 use reth_ethereum_consensus::EthBeaconConsensus;
320 use reth_network_p2p::test_utils::TestFullBlockClient;
321 use reth_primitives_traits::SealedHeader;
322 use std::{future::poll_fn, sync::Arc};
323
324 struct TestHarness {
325 block_downloader:
326 BasicBlockDownloader<TestFullBlockClient, reth_ethereum_primitives::Block>,
327 client: TestFullBlockClient,
328 }
329
330 impl TestHarness {
331 fn new(total_blocks: usize) -> Self {
332 let chain_spec = Arc::new(
333 ChainSpecBuilder::default()
334 .chain(MAINNET.chain)
335 .genesis(MAINNET.genesis.clone())
336 .paris_activated()
337 .build(),
338 );
339
340 let client = TestFullBlockClient::default();
341 let header = Header {
342 base_fee_per_gas: Some(7),
343 gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
344 ..Default::default()
345 };
346 let header = SealedHeader::seal_slow(header);
347
348 insert_headers_into_client(&client, header, 0..total_blocks);
349 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec));
350
351 let block_downloader = BasicBlockDownloader::new(client.clone(), consensus);
352 Self { block_downloader, client }
353 }
354 }
355
356 #[tokio::test]
357 async fn block_downloader_range_request() {
358 const TOTAL_BLOCKS: usize = 10;
359 let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
360 let tip = client.highest_block().expect("there should be blocks here");
361
362 block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
364 tip.hash(),
365 tip.number,
366 )));
367
368 assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
370
371 let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
373 assert_eq!(first_req.start_hash(), tip.hash());
374 assert_eq!(first_req.count(), tip.number);
375
376 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
378 let next_ready = sync_future.await;
379
380 assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
381 assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64);
382 });
383
384 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
385 let next_ready = sync_future.await;
386
387 assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
388 assert_eq!(blocks.len(), TOTAL_BLOCKS);
390
391 for num in 1..=TOTAL_BLOCKS {
393 assert_eq!(blocks[num-1].number(), num as u64);
394 }
395 });
396 }
397
398 #[tokio::test]
399 async fn block_downloader_set_request() {
400 const TOTAL_BLOCKS: usize = 2;
401 let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
402
403 let tip = client.highest_block().expect("there should be blocks here");
404
405 block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
407 HashSet::from([tip.hash(), tip.parent_hash]),
408 )));
409
410 assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);
412
413 for _ in 0..TOTAL_BLOCKS {
415 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
416 let next_ready = sync_future.await;
417
418 assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
419 assert_eq!(remaining_blocks, 1);
420 });
421 }
422
423 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
424 let next_ready = sync_future.await;
425 assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
426 assert_eq!(blocks.len(), TOTAL_BLOCKS);
428
429 for num in 1..=TOTAL_BLOCKS {
431 assert_eq!(blocks[num-1].number(), num as u64);
432 }
433 });
434 }
435
436 #[tokio::test]
437 async fn block_downloader_clear_request() {
438 const TOTAL_BLOCKS: usize = 10;
439 let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
440
441 let tip = client.highest_block().expect("there should be blocks here");
442
443 block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
445 tip.hash(),
446 tip.number,
447 )));
448
449 let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
451 block_downloader
452 .on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));
453
454 assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
456
457 let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
459 assert_eq!(first_req.start_hash(), tip.hash());
460 assert_eq!(first_req.count(), tip.number);
461
462 assert_eq!(block_downloader.inflight_full_block_requests.len(), download_set.len());
464
465 block_downloader.on_action(DownloadAction::Clear);
467
468 assert_eq!(block_downloader.inflight_block_range_requests.len(), 0);
470
471 assert_eq!(block_downloader.inflight_full_block_requests.len(), 0);
473 }
474}