reth_network/session/
counter.rs

1use super::ExceedsSessionLimit;
2use reth_network_api::Direction;
3use reth_network_types::SessionLimits;
4
5/// Keeps track of all sessions.
6#[derive(Debug, Clone)]
7pub struct SessionCounter {
8    /// Limits to enforce.
9    limits: SessionLimits,
10    /// Number of pending incoming sessions.
11    pending_inbound: u32,
12    /// Number of pending outgoing sessions.
13    pending_outbound: u32,
14    /// Number of active inbound sessions.
15    active_inbound: u32,
16    /// Number of active outbound sessions.
17    active_outbound: u32,
18}
19
20// === impl SessionCounter ===
21
22impl SessionCounter {
23    pub(crate) const fn new(limits: SessionLimits) -> Self {
24        Self {
25            limits,
26            pending_inbound: 0,
27            pending_outbound: 0,
28            active_inbound: 0,
29            active_outbound: 0,
30        }
31    }
32
33    pub(crate) fn inc_pending_inbound(&mut self) {
34        self.pending_inbound += 1;
35    }
36
37    pub(crate) fn inc_pending_outbound(&mut self) {
38        self.pending_outbound += 1;
39    }
40
41    pub(crate) fn dec_pending(&mut self, direction: &Direction) {
42        match direction {
43            Direction::Outgoing(_) => {
44                self.pending_outbound -= 1;
45            }
46            Direction::Incoming => {
47                self.pending_inbound -= 1;
48            }
49        }
50    }
51
52    pub(crate) fn inc_active(&mut self, direction: &Direction) {
53        match direction {
54            Direction::Outgoing(_) => {
55                self.active_outbound += 1;
56            }
57            Direction::Incoming => {
58                self.active_inbound += 1;
59            }
60        }
61    }
62
63    pub(crate) fn dec_active(&mut self, direction: &Direction) {
64        match direction {
65            Direction::Outgoing(_) => {
66                self.active_outbound -= 1;
67            }
68            Direction::Incoming => {
69                self.active_inbound -= 1;
70            }
71        }
72    }
73
74    pub(crate) const fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> {
75        Self::ensure(self.pending_outbound, self.limits.max_pending_outbound)
76    }
77
78    pub(crate) const fn ensure_pending_inbound(&self) -> Result<(), ExceedsSessionLimit> {
79        Self::ensure(self.pending_inbound, self.limits.max_pending_inbound)
80    }
81
82    const fn ensure(current: u32, limit: Option<u32>) -> Result<(), ExceedsSessionLimit> {
83        if let Some(limit) = limit {
84            if current >= limit {
85                return Err(ExceedsSessionLimit(limit))
86            }
87        }
88        Ok(())
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn test_limits() {
98        let mut limits = SessionCounter::new(SessionLimits::default().with_max_pending_inbound(2));
99        assert!(limits.ensure_pending_outbound().is_ok());
100        limits.inc_pending_inbound();
101        assert!(limits.ensure_pending_inbound().is_ok());
102        limits.inc_pending_inbound();
103        assert!(limits.ensure_pending_inbound().is_err());
104    }
105}