reth_network_p2p/
error.rsuse std::ops::RangeInclusive;
use super::headers::client::HeadersRequest;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockNumber, B256};
use derive_more::{Display, Error};
use reth_consensus::ConsensusError;
use reth_network_peers::WithPeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{GotExpected, GotExpectedBoxed};
use reth_storage_errors::{db::DatabaseError, provider::ProviderError};
use tokio::sync::{mpsc, oneshot};
pub type RequestResult<T> = Result<T, RequestError>;
pub type PeerRequestResult<T> = RequestResult<WithPeerId<T>>;
pub trait EthResponseValidator {
fn is_likely_bad_headers_response(&self, request: &HeadersRequest) -> bool;
fn reputation_change_err(&self) -> Option<ReputationChangeKind>;
}
impl<H: BlockHeader> EthResponseValidator for RequestResult<Vec<H>> {
fn is_likely_bad_headers_response(&self, request: &HeadersRequest) -> bool {
match self {
Ok(headers) => {
let request_length = headers.len() as u64;
if request_length <= 1 && request.limit != request_length {
return true
}
match request.start {
BlockHashOrNumber::Number(block_number) => {
headers.first().is_some_and(|header| block_number != header.number())
}
BlockHashOrNumber::Hash(_) => {
false
}
}
}
Err(_) => true,
}
}
fn reputation_change_err(&self) -> Option<ReputationChangeKind> {
if let Err(err) = self {
match err {
RequestError::ChannelClosed |
RequestError::ConnectionDropped |
RequestError::UnsupportedCapability |
RequestError::BadResponse => None,
RequestError::Timeout => Some(ReputationChangeKind::Timeout),
}
} else {
None
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Display, Error)]
pub enum RequestError {
#[display("closed channel to the peer")]
ChannelClosed,
#[display("connection to a peer dropped while handling the request")]
ConnectionDropped,
#[display("capability message is not supported by remote peer")]
UnsupportedCapability,
#[display("request timed out while awaiting response")]
Timeout,
#[display("received bad response")]
BadResponse,
}
impl RequestError {
pub const fn is_retryable(&self) -> bool {
matches!(self, Self::Timeout | Self::ConnectionDropped)
}
pub const fn is_channel_closed(&self) -> bool {
matches!(self, Self::ChannelClosed)
}
}
impl<T> From<mpsc::error::SendError<T>> for RequestError {
fn from(_: mpsc::error::SendError<T>) -> Self {
Self::ChannelClosed
}
}
impl From<oneshot::error::RecvError> for RequestError {
fn from(_: oneshot::error::RecvError) -> Self {
Self::ChannelClosed
}
}
pub type DownloadResult<T> = Result<T, DownloadError>;
#[derive(Debug, Clone, PartialEq, Eq, Display, Error)]
pub enum DownloadError {
#[display("failed to validate header {hash}, block number {number}: {error}")]
HeaderValidation {
hash: B256,
number: u64,
#[error(source)]
error: Box<ConsensusError>,
},
#[display("received invalid tip: {_0}")]
InvalidTip(GotExpectedBoxed<B256>),
#[display("received invalid tip number: {_0}")]
InvalidTipNumber(GotExpected<u64>),
#[display("headers response starts at unexpected block: {_0}")]
HeadersResponseStartBlockMismatch(GotExpected<u64>),
#[display("received less headers than expected: {_0}")]
HeadersResponseTooShort(GotExpected<u64>),
#[display("failed to validate body for header {hash}, block number {number}: {error}")]
BodyValidation {
hash: B256,
number: u64,
error: Box<ConsensusError>,
},
#[display("received more bodies than requested: {_0}")]
TooManyBodies(GotExpected<usize>),
#[display("header missing from the database: {block_number}")]
MissingHeader {
block_number: BlockNumber,
},
#[display("requested body range is invalid: {range:?}")]
InvalidBodyRange {
range: RangeInclusive<BlockNumber>,
},
#[display("timed out while waiting for response")]
Timeout,
#[display("received empty response")]
EmptyResponse,
RequestError(RequestError),
Provider(ProviderError),
}
impl From<DatabaseError> for DownloadError {
fn from(error: DatabaseError) -> Self {
Self::Provider(ProviderError::Database(error))
}
}
impl From<RequestError> for DownloadError {
fn from(error: RequestError) -> Self {
Self::RequestError(error)
}
}
impl From<ProviderError> for DownloadError {
fn from(error: ProviderError) -> Self {
Self::Provider(error)
}
}
#[cfg(test)]
mod tests {
use alloy_consensus::Header;
use super::*;
#[test]
fn test_is_likely_bad_headers_response() {
let request =
HeadersRequest { start: 0u64.into(), limit: 0, direction: Default::default() };
let headers: Vec<Header> = vec![];
assert!(!Ok(headers).is_likely_bad_headers_response(&request));
let request =
HeadersRequest { start: 0u64.into(), limit: 1, direction: Default::default() };
let headers: Vec<Header> = vec![];
assert!(Ok(headers).is_likely_bad_headers_response(&request));
}
}