use alloy_primitives::bytes::BytesMut;
use futures::Stream;
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network_api::{Direction, PeerId};
use std::{
fmt,
net::SocketAddr,
ops::{Deref, DerefMut},
pin::Pin,
};
pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static {
type ConnectionHandler: ConnectionHandler;
fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Self::ConnectionHandler>;
fn on_outgoing(
&self,
socket_addr: SocketAddr,
peer_id: PeerId,
) -> Option<Self::ConnectionHandler>;
}
pub trait ConnectionHandler: Send + Sync + 'static {
type Connection: Stream<Item = BytesMut> + Send + 'static;
fn protocol(&self) -> Protocol;
fn on_unsupported_by_peer(
self,
supported: &SharedCapabilities,
direction: Direction,
peer_id: PeerId,
) -> OnNotSupported;
fn into_connection(
self,
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Self::Connection;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum OnNotSupported {
#[default]
KeepAlive,
Disconnect,
}
#[derive(Debug)]
pub struct RlpxSubProtocol(Box<dyn DynProtocolHandler>);
pub trait IntoRlpxSubProtocol {
fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol;
}
impl<T> IntoRlpxSubProtocol for T
where
T: ProtocolHandler + Send + Sync + 'static,
{
fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
RlpxSubProtocol(Box::new(self))
}
}
impl IntoRlpxSubProtocol for RlpxSubProtocol {
fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
self
}
}
#[derive(Debug, Default)]
pub struct RlpxSubProtocols {
protocols: Vec<RlpxSubProtocol>,
}
impl RlpxSubProtocols {
pub fn push(&mut self, protocol: impl IntoRlpxSubProtocol) {
self.protocols.push(protocol.into_rlpx_sub_protocol());
}
pub(crate) fn on_incoming(&self, socket_addr: SocketAddr) -> RlpxSubProtocolHandlers {
RlpxSubProtocolHandlers(
self.protocols
.iter()
.filter_map(|protocol| protocol.0.on_incoming(socket_addr))
.collect(),
)
}
pub(crate) fn on_outgoing(
&self,
socket_addr: SocketAddr,
peer_id: PeerId,
) -> RlpxSubProtocolHandlers {
RlpxSubProtocolHandlers(
self.protocols
.iter()
.filter_map(|protocol| protocol.0.on_outgoing(socket_addr, peer_id))
.collect(),
)
}
}
#[derive(Default)]
pub(crate) struct RlpxSubProtocolHandlers(Vec<Box<dyn DynConnectionHandler>>);
impl RlpxSubProtocolHandlers {
pub(crate) fn into_iter(self) -> impl Iterator<Item = Box<dyn DynConnectionHandler>> {
self.0.into_iter()
}
}
impl Deref for RlpxSubProtocolHandlers {
type Target = Vec<Box<dyn DynConnectionHandler>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for RlpxSubProtocolHandlers {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub(crate) trait DynProtocolHandler: fmt::Debug + Send + Sync + 'static {
fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>>;
fn on_outgoing(
&self,
socket_addr: SocketAddr,
peer_id: PeerId,
) -> Option<Box<dyn DynConnectionHandler>>;
}
impl<T: ProtocolHandler> DynProtocolHandler for T {
fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>> {
T::on_incoming(self, socket_addr)
.map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
}
fn on_outgoing(
&self,
socket_addr: SocketAddr,
peer_id: PeerId,
) -> Option<Box<dyn DynConnectionHandler>> {
T::on_outgoing(self, socket_addr, peer_id)
.map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
}
}
pub(crate) trait DynConnectionHandler: Send + Sync + 'static {
fn protocol(&self) -> Protocol;
fn into_connection(
self: Box<Self>,
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>>;
}
impl<T: ConnectionHandler> DynConnectionHandler for T {
fn protocol(&self) -> Protocol {
T::protocol(self)
}
fn into_connection(
self: Box<Self>,
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>> {
Box::pin(T::into_connection(*self, direction, peer_id, conn))
}
}