1use std::ops::RangeInclusive;
23use super::headers::client::HeadersRequest;
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockHashOrNumber;
6use alloy_primitives::{BlockNumber, B256};
7use derive_more::{Display, Error};
8use reth_consensus::ConsensusError;
9use reth_network_peers::WithPeerId;
10use reth_network_types::ReputationChangeKind;
11use reth_primitives_traits::{GotExpected, GotExpectedBoxed};
12use reth_storage_errors::{db::DatabaseError, provider::ProviderError};
13use tokio::sync::{mpsc, oneshot};
1415/// Result alias for result of a request.
16pub type RequestResult<T> = Result<T, RequestError>;
1718/// Result with [`PeerId`][reth_network_peers::PeerId]
19pub type PeerRequestResult<T> = RequestResult<WithPeerId<T>>;
2021/// Helper trait used to validate responses.
22pub trait EthResponseValidator {
23/// Determine whether the response matches what we requested in [`HeadersRequest`]
24fn is_likely_bad_headers_response(&self, request: &HeadersRequest) -> bool;
2526/// Return the response reputation impact if any
27fn reputation_change_err(&self) -> Option<ReputationChangeKind>;
28}
2930impl<H: BlockHeader> EthResponseValidatorfor RequestResult<Vec<H>> {
31fn is_likely_bad_headers_response(&self, request: &HeadersRequest) -> bool {
32match self{
33Ok(headers) => {
34let request_length = headers.len() as u64;
3536if request_length <= 1 && request.limit != request_length {
37return true
38}
3940match request.start {
41 BlockHashOrNumber::Number(block_number) => {
42headers.first().is_some_and(|header| block_number != header.number())
43 }
44 BlockHashOrNumber::Hash(_) => {
45// we don't want to hash the header
46false
47}
48 }
49 }
50Err(_) => true,
51 }
52 }
5354/// [`RequestError::ChannelClosed`] is not possible here since these errors are mapped to
55 /// `ConnectionDropped`, which will be handled when the dropped connection is cleaned up.
56 ///
57 /// [`RequestError::ConnectionDropped`] should be ignored here because this is already handled
58 /// when the dropped connection is handled.
59 ///
60 /// [`RequestError::UnsupportedCapability`] is not used yet because we only support active
61 /// session for eth protocol.
62fn reputation_change_err(&self) -> Option<ReputationChangeKind> {
63if let Err(err) = self{
64match err {
65RequestError::ChannelClosed |
66RequestError::ConnectionDropped |
67RequestError::UnsupportedCapability |
68RequestError::BadResponse => None,
69RequestError::Timeout => Some(ReputationChangeKind::Timeout),
70 }
71 } else {
72None73 }
74 }
75}
7677/// Error variants that can happen when sending requests to a session.
78///
79/// Represents errors encountered when sending requests.
80#[derive(Clone, Debug, Eq, PartialEq, Display, Error)]
81pub enum RequestError {
82/// Closed channel to the peer.
83 /// Indicates the channel to the peer is closed.
84#[display("closed channel to the peer")]
85ChannelClosed,
86/// Connection to a peer dropped while handling the request.
87 /// Represents a dropped connection while handling the request.
88#[display("connection to a peer dropped while handling the request")]
89ConnectionDropped,
90/// Capability message is not supported by the remote peer.
91 /// Indicates an unsupported capability message from the remote peer.
92#[display("capability message is not supported by remote peer")]
93UnsupportedCapability,
94/// Request timed out while awaiting response.
95 /// Represents a timeout while waiting for a response.
96#[display("request timed out while awaiting response")]
97Timeout,
98/// Received bad response.
99 /// Indicates a bad response was received.
100#[display("received bad response")]
101BadResponse,
102}
103104// === impl RequestError ===
105106impl RequestError {
107/// Indicates whether this error is retryable or fatal.
108pub const fn is_retryable(&self) -> bool {
109matches!(self, Self::Timeout | Self::ConnectionDropped)
110 }
111112/// Whether the error happened because the channel was closed.
113pub const fn is_channel_closed(&self) -> bool {
114matches!(self, Self::ChannelClosed)
115 }
116}
117118impl<T> From<mpsc::error::SendError<T>> for RequestError {
119fn from(_: mpsc::error::SendError<T>) -> Self {
120Self::ChannelClosed
121 }
122}
123124impl From<oneshot::error::RecvError> for RequestError {
125fn from(_: oneshot::error::RecvError) -> Self {
126Self::ChannelClosed
127 }
128}
129130/// The download result type
131pub type DownloadResult<T> = Result<T, DownloadError>;
132133/// The downloader error type
134#[derive(Debug, Clone, Display, Error)]
135pub enum DownloadError {
136/* ==================== HEADER ERRORS ==================== */
137/// Header validation failed.
138#[display("failed to validate header {hash}, block number {number}: {error}")]
139HeaderValidation {
140/// Hash of header failing validation
141hash: B256,
142/// Number of header failing validation
143number: u64,
144/// The details of validation failure
145#[error(source)]
146error: Box<ConsensusError>,
147 },
148/// Received an invalid tip.
149#[display("received invalid tip: {_0}")]
150InvalidTip(GotExpectedBoxed<B256>),
151/// Received a tip with an invalid tip number.
152#[display("received invalid tip number: {_0}")]
153InvalidTipNumber(GotExpected<u64>),
154/// Received a response to a request with unexpected start block
155#[display("headers response starts at unexpected block: {_0}")]
156HeadersResponseStartBlockMismatch(GotExpected<u64>),
157/// Received headers with less than expected items.
158#[display("received less headers than expected: {_0}")]
159HeadersResponseTooShort(GotExpected<u64>),
160161/* ==================== BODIES ERRORS ==================== */
162/// Block validation failed
163#[display("failed to validate body for header {hash}, block number {number}: {error}")]
164BodyValidation {
165/// Hash of the block failing validation
166hash: B256,
167/// Number of the block failing validation
168number: u64,
169/// The details of validation failure
170error: Box<ConsensusError>,
171 },
172/// Received more bodies than requested.
173#[display("received more bodies than requested: {_0}")]
174TooManyBodies(GotExpected<usize>),
175/// Headers missing from the database.
176#[display("header missing from the database: {block_number}")]
177MissingHeader {
178/// Missing header block number.
179block_number: BlockNumber,
180 },
181/// Body range invalid
182#[display("requested body range is invalid: {range:?}")]
183InvalidBodyRange {
184/// Invalid block number range.
185range: RangeInclusive<BlockNumber>,
186 },
187/* ==================== COMMON ERRORS ==================== */
188/// Timed out while waiting for request id response.
189#[display("timed out while waiting for response")]
190Timeout,
191/// Received empty response while expecting non empty
192#[display("received empty response")]
193EmptyResponse,
194/// Error while executing the request.
195RequestError(RequestError),
196/// Provider error.
197Provider(ProviderError),
198}
199200impl From<DatabaseError> for DownloadError {
201fn from(error: DatabaseError) -> Self {
202Self::Provider(ProviderError::Database(error))
203 }
204}
205206impl From<RequestError> for DownloadError {
207fn from(error: RequestError) -> Self {
208Self::RequestError(error)
209 }
210}
211212impl From<ProviderError> for DownloadError {
213fn from(error: ProviderError) -> Self {
214Self::Provider(error)
215 }
216}
217218#[cfg(test)]
219mod tests {
220use alloy_consensus::Header;
221222use super::*;
223224#[test]
225fn test_is_likely_bad_headers_response() {
226let request =
227 HeadersRequest { start: 0u64.into(), limit: 0, direction: Default::default() };
228let headers: Vec<Header> = vec![];
229assert!(!Ok(headers).is_likely_bad_headers_response(&request));
230231let request =
232 HeadersRequest { start: 0u64.into(), limit: 1, direction: Default::default() };
233let headers: Vec<Header> = vec![];
234assert!(Ok(headers).is_likely_bad_headers_response(&request));
235 }
236}