reth_engine_tree/
download.rs

1//! Handler that can download blocks on demand (e.g. from the network).
2
3use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
4use alloy_consensus::BlockHeader;
5use alloy_primitives::B256;
6use futures::FutureExt;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9    full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
10    BlockClient,
11};
12use reth_primitives_traits::{Block, RecoveredBlock, SealedBlock};
13use std::{
14    cmp::{Ordering, Reverse},
15    collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
16    fmt::Debug,
17    sync::Arc,
18    task::{Context, Poll},
19};
20use tracing::trace;
21
22/// A trait that can download blocks on demand.
23pub trait BlockDownloader: Send + Sync {
24    /// Type of the block being downloaded.
25    type Block: Block;
26
27    /// Handle an action.
28    fn on_action(&mut self, action: DownloadAction);
29
30    /// Advance in progress requests if any
31    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<Self::Block>>;
32}
33
34/// Actions that can be performed by the block downloader.
35#[derive(Debug)]
36pub enum DownloadAction {
37    /// Stop downloading blocks.
38    Clear,
39    /// Download given blocks
40    Download(DownloadRequest),
41}
42
43/// Outcome of downloaded blocks.
44#[derive(Debug)]
45pub enum DownloadOutcome<B: Block> {
46    /// Downloaded blocks.
47    Blocks(Vec<RecoveredBlock<B>>),
48    /// New download started.
49    NewDownloadStarted {
50        /// How many blocks are pending in this download.
51        remaining_blocks: u64,
52        /// The hash of the highest block of this download.
53        target: B256,
54    },
55}
56
57/// Basic [`BlockDownloader`].
58#[allow(missing_debug_implementations)]
59pub struct BasicBlockDownloader<Client, B: Block>
60where
61    Client: BlockClient + 'static,
62{
63    /// A downloader that can download full blocks from the network.
64    full_block_client: FullBlockClient<Client>,
65    /// In-flight full block requests in progress.
66    inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
67    /// In-flight full block _range_ requests in progress.
68    inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
69    /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
70    /// ordering. This means the blocks will be popped from the heap with ascending block numbers.
71    set_buffered_blocks: BinaryHeap<Reverse<OrderedRecoveredBlock<B>>>,
72    /// Engine download metrics.
73    metrics: BlockDownloaderMetrics,
74    /// Pending events to be emitted.
75    pending_events: VecDeque<DownloadOutcome<B>>,
76}
77
78impl<Client, B> BasicBlockDownloader<Client, B>
79where
80    Client: BlockClient<Block = B> + 'static,
81    B: Block,
82{
83    /// Create a new instance
84    pub fn new(client: Client, consensus: Arc<dyn Consensus<B, Error = ConsensusError>>) -> Self {
85        Self {
86            full_block_client: FullBlockClient::new(client, consensus),
87            inflight_full_block_requests: Vec::new(),
88            inflight_block_range_requests: Vec::new(),
89            set_buffered_blocks: BinaryHeap::new(),
90            metrics: BlockDownloaderMetrics::default(),
91            pending_events: Default::default(),
92        }
93    }
94
95    /// Clears the stored inflight requests.
96    fn clear(&mut self) {
97        self.inflight_full_block_requests.clear();
98        self.inflight_block_range_requests.clear();
99        self.set_buffered_blocks.clear();
100        self.update_block_download_metrics();
101    }
102
103    /// Processes a download request.
104    fn download(&mut self, request: DownloadRequest) {
105        match request {
106            DownloadRequest::BlockSet(hashes) => self.download_block_set(hashes),
107            DownloadRequest::BlockRange(hash, count) => self.download_block_range(hash, count),
108        }
109    }
110
111    /// Processes a block set download request.
112    fn download_block_set(&mut self, hashes: HashSet<B256>) {
113        for hash in hashes {
114            self.download_full_block(hash);
115        }
116    }
117
118    /// Processes a block range download request.
119    fn download_block_range(&mut self, hash: B256, count: u64) {
120        if count == 1 {
121            self.download_full_block(hash);
122        } else {
123            trace!(
124                target: "consensus::engine",
125                ?hash,
126                ?count,
127                "start downloading full block range."
128            );
129
130            let request = self.full_block_client.get_full_block_range(hash, count);
131            self.push_pending_event(DownloadOutcome::NewDownloadStarted {
132                remaining_blocks: request.count(),
133                target: request.start_hash(),
134            });
135            self.inflight_block_range_requests.push(request);
136        }
137    }
138
139    /// Starts requesting a full block from the network.
140    ///
141    /// Returns `true` if the request was started, `false` if there's already a request for the
142    /// given hash.
143    fn download_full_block(&mut self, hash: B256) -> bool {
144        if self.is_inflight_request(hash) {
145            return false
146        }
147        self.push_pending_event(DownloadOutcome::NewDownloadStarted {
148            remaining_blocks: 1,
149            target: hash,
150        });
151
152        trace!(
153            target: "consensus::engine::sync",
154            ?hash,
155            "Start downloading full block"
156        );
157
158        let request = self.full_block_client.get_full_block(hash);
159        self.inflight_full_block_requests.push(request);
160
161        self.update_block_download_metrics();
162
163        true
164    }
165
166    /// Returns true if there's already a request for the given hash.
167    fn is_inflight_request(&self, hash: B256) -> bool {
168        self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
169    }
170
171    /// Sets the metrics for the active downloads
172    fn update_block_download_metrics(&self) {
173        let blocks = self.inflight_full_block_requests.len() +
174            self.inflight_block_range_requests.iter().map(|r| r.count() as usize).sum::<usize>();
175        self.metrics.active_block_downloads.set(blocks as f64);
176    }
177
178    /// Adds a pending event to the FIFO queue.
179    fn push_pending_event(&mut self, pending_event: DownloadOutcome<B>) {
180        self.pending_events.push_back(pending_event);
181    }
182
183    /// Removes a pending event from the FIFO queue.
184    fn pop_pending_event(&mut self) -> Option<DownloadOutcome<B>> {
185        self.pending_events.pop_front()
186    }
187}
188
189impl<Client, B> BlockDownloader for BasicBlockDownloader<Client, B>
190where
191    Client: BlockClient<Block = B>,
192    B: Block,
193{
194    type Block = B;
195
196    /// Handles incoming download actions.
197    fn on_action(&mut self, action: DownloadAction) {
198        match action {
199            DownloadAction::Clear => self.clear(),
200            DownloadAction::Download(request) => self.download(request),
201        }
202    }
203
204    /// Advances the download process.
205    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
206        if let Some(pending_event) = self.pop_pending_event() {
207            return Poll::Ready(pending_event);
208        }
209
210        // advance all full block requests
211        for idx in (0..self.inflight_full_block_requests.len()).rev() {
212            let mut request = self.inflight_full_block_requests.swap_remove(idx);
213            if let Poll::Ready(block) = request.poll_unpin(cx) {
214                trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
215                self.set_buffered_blocks.push(Reverse(block.into()));
216            } else {
217                // still pending
218                self.inflight_full_block_requests.push(request);
219            }
220        }
221
222        // advance all full block range requests
223        for idx in (0..self.inflight_block_range_requests.len()).rev() {
224            let mut request = self.inflight_block_range_requests.swap_remove(idx);
225            if let Poll::Ready(blocks) = request.poll_unpin(cx) {
226                trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
227                self.set_buffered_blocks.extend(
228                    blocks
229                        .into_iter()
230                        .map(|b| {
231                            let senders = b.senders().unwrap_or_default();
232                            OrderedRecoveredBlock(RecoveredBlock::new_sealed(b, senders))
233                        })
234                        .map(Reverse),
235                );
236            } else {
237                // still pending
238                self.inflight_block_range_requests.push(request);
239            }
240        }
241
242        self.update_block_download_metrics();
243
244        if self.set_buffered_blocks.is_empty() {
245            return Poll::Pending;
246        }
247
248        // drain all unique element of the block buffer if there are any
249        let mut downloaded_blocks: Vec<RecoveredBlock<B>> =
250            Vec::with_capacity(self.set_buffered_blocks.len());
251        while let Some(block) = self.set_buffered_blocks.pop() {
252            // peek ahead and pop duplicates
253            while let Some(peek) = self.set_buffered_blocks.peek_mut() {
254                if peek.0 .0.hash() == block.0 .0.hash() {
255                    PeekMut::pop(peek);
256                } else {
257                    break
258                }
259            }
260            downloaded_blocks.push(block.0.into());
261        }
262        Poll::Ready(DownloadOutcome::Blocks(downloaded_blocks))
263    }
264}
265
266/// A wrapper type around [`RecoveredBlock`] that implements the [Ord]
267/// trait by block number.
268#[derive(Debug, Clone, PartialEq, Eq)]
269struct OrderedRecoveredBlock<B: Block>(RecoveredBlock<B>);
270
271impl<B: Block> PartialOrd for OrderedRecoveredBlock<B> {
272    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
273        Some(self.cmp(other))
274    }
275}
276
277impl<B: Block> Ord for OrderedRecoveredBlock<B> {
278    fn cmp(&self, other: &Self) -> Ordering {
279        self.0.number().cmp(&other.0.number())
280    }
281}
282
283impl<B: Block> From<SealedBlock<B>> for OrderedRecoveredBlock<B> {
284    fn from(block: SealedBlock<B>) -> Self {
285        let senders = block.senders().unwrap_or_default();
286        Self(RecoveredBlock::new_sealed(block, senders))
287    }
288}
289
290impl<B: Block> From<OrderedRecoveredBlock<B>> for RecoveredBlock<B> {
291    fn from(value: OrderedRecoveredBlock<B>) -> Self {
292        value.0
293    }
294}
295
296/// A [`BlockDownloader`] that does nothing.
297#[derive(Debug, Clone, Default)]
298#[non_exhaustive]
299pub struct NoopBlockDownloader<B>(core::marker::PhantomData<B>);
300
301impl<B: Block> BlockDownloader for NoopBlockDownloader<B> {
302    type Block = B;
303
304    fn on_action(&mut self, _event: DownloadAction) {}
305
306    fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
307        Poll::Pending
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::test_utils::insert_headers_into_client;
315    use alloy_consensus::Header;
316    use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT_30M;
317    use assert_matches::assert_matches;
318    use reth_chainspec::{ChainSpecBuilder, MAINNET};
319    use reth_ethereum_consensus::EthBeaconConsensus;
320    use reth_network_p2p::test_utils::TestFullBlockClient;
321    use reth_primitives_traits::SealedHeader;
322    use std::{future::poll_fn, sync::Arc};
323
324    struct TestHarness {
325        block_downloader:
326            BasicBlockDownloader<TestFullBlockClient, reth_ethereum_primitives::Block>,
327        client: TestFullBlockClient,
328    }
329
330    impl TestHarness {
331        fn new(total_blocks: usize) -> Self {
332            let chain_spec = Arc::new(
333                ChainSpecBuilder::default()
334                    .chain(MAINNET.chain)
335                    .genesis(MAINNET.genesis.clone())
336                    .paris_activated()
337                    .build(),
338            );
339
340            let client = TestFullBlockClient::default();
341            let header = Header {
342                base_fee_per_gas: Some(7),
343                gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
344                ..Default::default()
345            };
346            let header = SealedHeader::seal_slow(header);
347
348            insert_headers_into_client(&client, header, 0..total_blocks);
349            let consensus = Arc::new(EthBeaconConsensus::new(chain_spec));
350
351            let block_downloader = BasicBlockDownloader::new(client.clone(), consensus);
352            Self { block_downloader, client }
353        }
354    }
355
356    #[tokio::test]
357    async fn block_downloader_range_request() {
358        const TOTAL_BLOCKS: usize = 10;
359        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
360        let tip = client.highest_block().expect("there should be blocks here");
361
362        // send block range download request
363        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
364            tip.hash(),
365            tip.number,
366        )));
367
368        // ensure we have one in flight range request
369        assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
370
371        // ensure the range request is made correctly
372        let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
373        assert_eq!(first_req.start_hash(), tip.hash());
374        assert_eq!(first_req.count(), tip.number);
375
376        // poll downloader
377        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
378        let next_ready = sync_future.await;
379
380        assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
381            assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64);
382        });
383
384        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
385        let next_ready = sync_future.await;
386
387        assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
388            // ensure all blocks were obtained
389            assert_eq!(blocks.len(), TOTAL_BLOCKS);
390
391            // ensure they are in ascending order
392            for num in 1..=TOTAL_BLOCKS {
393                assert_eq!(blocks[num-1].number(), num as u64);
394            }
395        });
396    }
397
398    #[tokio::test]
399    async fn block_downloader_set_request() {
400        const TOTAL_BLOCKS: usize = 2;
401        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
402
403        let tip = client.highest_block().expect("there should be blocks here");
404
405        // send block set download request
406        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
407            HashSet::from([tip.hash(), tip.parent_hash]),
408        )));
409
410        // ensure we have TOTAL_BLOCKS in flight full block request
411        assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);
412
413        // poll downloader
414        for _ in 0..TOTAL_BLOCKS {
415            let sync_future = poll_fn(|cx| block_downloader.poll(cx));
416            let next_ready = sync_future.await;
417
418            assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
419                assert_eq!(remaining_blocks, 1);
420            });
421        }
422
423        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
424        let next_ready = sync_future.await;
425        assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
426            // ensure all blocks were obtained
427            assert_eq!(blocks.len(), TOTAL_BLOCKS);
428
429            // ensure they are in ascending order
430            for num in 1..=TOTAL_BLOCKS {
431                assert_eq!(blocks[num-1].number(), num as u64);
432            }
433        });
434    }
435
436    #[tokio::test]
437    async fn block_downloader_clear_request() {
438        const TOTAL_BLOCKS: usize = 10;
439        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
440
441        let tip = client.highest_block().expect("there should be blocks here");
442
443        // send block range download request
444        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
445            tip.hash(),
446            tip.number,
447        )));
448
449        // send block set download request
450        let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
451        block_downloader
452            .on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));
453
454        // ensure we have one in flight range request
455        assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
456
457        // ensure the range request is made correctly
458        let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
459        assert_eq!(first_req.start_hash(), tip.hash());
460        assert_eq!(first_req.count(), tip.number);
461
462        // ensure we have download_set.len() in flight full block request
463        assert_eq!(block_downloader.inflight_full_block_requests.len(), download_set.len());
464
465        // send clear request
466        block_downloader.on_action(DownloadAction::Clear);
467
468        // ensure we have no in flight range request
469        assert_eq!(block_downloader.inflight_block_range_requests.len(), 0);
470
471        // ensure we have no in flight full block request
472        assert_eq!(block_downloader.inflight_full_block_requests.len(), 0);
473    }
474}