1use crate::errors::PingerError;
2use std::{
3pin::Pin,
4 task::{Context, Poll},
5time::Duration,
6};
7use tokio::time::{Instant, Interval, Sleep};
8use tokio_stream::Stream;
910/// The pinger is a state machine that is created with a maximum number of pongs that can be
11/// missed.
12#[derive(Debug)]
13pub(crate) struct Pinger {
14/// The timer used for the next ping.
15ping_interval: Interval,
16/// The timer used for the next ping.
17timeout_timer: Pin<Box<Sleep>>,
18/// The timeout duration for each ping.
19timeout: Duration,
20/// Keeps track of the state
21state: PingState,
22}
2324// === impl Pinger ===
2526impl Pinger {
27/// Creates a new [`Pinger`] with the given ping interval duration,
28 /// and timeout duration.
29pub(crate) fn new(ping_interval: Duration, timeout_duration: Duration) -> Self {
30let now = Instant::now();
31let timeout_timer = tokio::time::sleep(timeout_duration);
32Self {
33 state: PingState::Ready,
34 ping_interval: tokio::time::interval_at(now + ping_interval, ping_interval),
35 timeout_timer: Box::pin(timeout_timer),
36 timeout: timeout_duration,
37 }
38 }
3940/// Mark a pong as received, and transition the pinger to the `Ready` state if it was in the
41 /// `WaitingForPong` state. Unsets the sleep timer.
42pub(crate) fn on_pong(&mut self) -> Result<(), PingerError> {
43match self.state {
44PingState::Ready => Err(PingerError::UnexpectedPong),
45PingState::WaitingForPong => {
46self.state = PingState::Ready;
47self.ping_interval.reset();
48Ok(())
49 }
50PingState::TimedOut => {
51// if we receive a pong after timeout then we also reset the state, since the
52 // connection was kept alive after timeout
53self.state = PingState::Ready;
54self.ping_interval.reset();
55Ok(())
56 }
57 }
58 }
5960/// Returns the current state of the pinger.
61pub(crate) const fn state(&self) -> PingState {
62self.state
63 }
6465/// Polls the state of the pinger and returns whether a new ping needs to be sent or if a
66 /// previous ping timed out.
67pub(crate) fn poll_ping(
68&mut self,
69 cx: &mut Context<'_>,
70 ) -> Poll<Result<PingerEvent, PingerError>> {
71match self.state() {
72PingState::Ready => {
73if self.ping_interval.poll_tick(cx).is_ready() {
74self.timeout_timer.as_mut().reset(Instant::now() + self.timeout);
75self.state = PingState::WaitingForPong;
76return Poll::Ready(Ok(PingerEvent::Ping))
77 }
78 }
79PingState::WaitingForPong => {
80if self.timeout_timer.is_elapsed() {
81self.state = PingState::TimedOut;
82return Poll::Ready(Ok(PingerEvent::Timeout))
83 }
84 }
85PingState::TimedOut => {
86// we treat continuous calls while in timeout as pending, since the connection is
87 // not yet terminated
88return Poll::Pending89 }
90 };
91Poll::Pending92 }
93}
9495impl Stream for Pinger {
96type Item = Result<PingerEvent, PingerError>;
9798fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99self.get_mut().poll_ping(cx).map(Some)
100 }
101}
102103/// This represents the possible states of the pinger.
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub(crate) enum PingState {
106/// There are no pings in flight, or all pings have been responded to, and we are ready to send
107 /// a ping at a later point.
108Ready,
109/// We have sent a ping and are waiting for a pong, but the peer has missed n pongs.
110WaitingForPong,
111/// The peer has failed to respond to a ping.
112TimedOut,
113}
114115/// The element type produced by a [`Pinger`], representing either a new
116/// [`Ping`](super::P2PMessage::Ping)
117/// message to send, or an indication that the peer should be timed out.
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub(crate) enum PingerEvent {
120/// A new [`Ping`](super::P2PMessage::Ping) message should be sent.
121Ping,
122123/// The peer should be timed out.
124Timeout,
125}
126127#[cfg(test)]
128mod tests {
129use super::*;
130use futures::StreamExt;
131132#[tokio::test]
133async fn test_ping_timeout() {
134let interval = Duration::from_millis(300);
135// we should wait for the interval to elapse and receive a pong before the timeout elapses
136let mut pinger = Pinger::new(interval, Duration::from_millis(20));
137assert_eq!(pinger.next().await.unwrap().unwrap(), PingerEvent::Ping);
138 pinger.on_pong().unwrap();
139assert_eq!(pinger.next().await.unwrap().unwrap(), PingerEvent::Ping);
140141 tokio::time::sleep(interval).await;
142assert_eq!(pinger.next().await.unwrap().unwrap(), PingerEvent::Timeout);
143 pinger.on_pong().unwrap();
144145assert_eq!(pinger.next().await.unwrap().unwrap(), PingerEvent::Ping);
146 }
147}