reth_metrics/common/
mpsc.rs
1use crate::Metrics;
5use futures::Stream;
6use metrics::Counter;
7use std::{
8 pin::Pin,
9 task::{ready, Context, Poll},
10};
11use tokio::sync::mpsc::{
12 self,
13 error::{SendError, TryRecvError, TrySendError},
14 OwnedPermit,
15};
16use tokio_util::sync::{PollSendError, PollSender};
17
18pub fn metered_unbounded_channel<T>(
20 scope: &'static str,
21) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
22 let (tx, rx) = mpsc::unbounded_channel();
23 (UnboundedMeteredSender::new(tx, scope), UnboundedMeteredReceiver::new(rx, scope))
24}
25
26pub fn metered_channel<T>(
29 buffer: usize,
30 scope: &'static str,
31) -> (MeteredSender<T>, MeteredReceiver<T>) {
32 let (tx, rx) = mpsc::channel(buffer);
33 (MeteredSender::new(tx, scope), MeteredReceiver::new(rx, scope))
34}
35
36#[derive(Debug)]
38pub struct UnboundedMeteredSender<T> {
39 sender: mpsc::UnboundedSender<T>,
41 metrics: MeteredSenderMetrics,
43}
44
45impl<T> UnboundedMeteredSender<T> {
46 pub fn new(sender: mpsc::UnboundedSender<T>, scope: &'static str) -> Self {
49 Self { sender, metrics: MeteredSenderMetrics::new(scope) }
50 }
51
52 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
56 match self.sender.send(message) {
57 Ok(()) => {
58 self.metrics.messages_sent_total.increment(1);
59 Ok(())
60 }
61 Err(error) => {
62 self.metrics.send_errors_total.increment(1);
63 Err(error)
64 }
65 }
66 }
67}
68
69impl<T> Clone for UnboundedMeteredSender<T> {
70 fn clone(&self) -> Self {
71 Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
72 }
73}
74
75#[derive(Debug)]
77pub struct UnboundedMeteredReceiver<T> {
78 receiver: mpsc::UnboundedReceiver<T>,
80 metrics: MeteredReceiverMetrics,
82}
83
84impl<T> UnboundedMeteredReceiver<T> {
87 pub fn new(receiver: mpsc::UnboundedReceiver<T>, scope: &'static str) -> Self {
90 Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
91 }
92
93 pub async fn recv(&mut self) -> Option<T> {
95 let msg = self.receiver.recv().await;
96 if msg.is_some() {
97 self.metrics.messages_received_total.increment(1);
98 }
99 msg
100 }
101
102 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
104 let msg = self.receiver.try_recv()?;
105 self.metrics.messages_received_total.increment(1);
106 Ok(msg)
107 }
108
109 pub fn close(&mut self) {
111 self.receiver.close();
112 }
113
114 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
116 let msg = ready!(self.receiver.poll_recv(cx));
117 if msg.is_some() {
118 self.metrics.messages_received_total.increment(1);
119 }
120 Poll::Ready(msg)
121 }
122}
123
124impl<T> Stream for UnboundedMeteredReceiver<T> {
125 type Item = T;
126
127 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128 self.poll_recv(cx)
129 }
130}
131
132#[derive(Debug)]
134pub struct MeteredSender<T> {
135 sender: mpsc::Sender<T>,
137 metrics: MeteredSenderMetrics,
139}
140
141impl<T> MeteredSender<T> {
142 pub fn new(sender: mpsc::Sender<T>, scope: &'static str) -> Self {
144 Self { sender, metrics: MeteredSenderMetrics::new(scope) }
145 }
146
147 pub fn try_reserve_owned(&self) -> Result<OwnedPermit<T>, TrySendError<mpsc::Sender<T>>> {
151 self.sender.clone().try_reserve_owned()
152 }
153
154 pub const fn inner(&self) -> &mpsc::Sender<T> {
156 &self.sender
157 }
158
159 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
162 match self.sender.try_send(message) {
163 Ok(()) => {
164 self.metrics.messages_sent_total.increment(1);
165 Ok(())
166 }
167 Err(error) => {
168 self.metrics.send_errors_total.increment(1);
169 Err(error)
170 }
171 }
172 }
173
174 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
177 match self.sender.send(value).await {
178 Ok(()) => {
179 self.metrics.messages_sent_total.increment(1);
180 Ok(())
181 }
182 Err(error) => {
183 self.metrics.send_errors_total.increment(1);
184 Err(error)
185 }
186 }
187 }
188}
189
190impl<T> Clone for MeteredSender<T> {
191 fn clone(&self) -> Self {
192 Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
193 }
194}
195
196#[derive(Debug)]
198pub struct MeteredReceiver<T> {
199 receiver: mpsc::Receiver<T>,
201 metrics: MeteredReceiverMetrics,
203}
204
205impl<T> MeteredReceiver<T> {
208 pub fn new(receiver: mpsc::Receiver<T>, scope: &'static str) -> Self {
210 Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
211 }
212
213 pub async fn recv(&mut self) -> Option<T> {
215 let msg = self.receiver.recv().await;
216 if msg.is_some() {
217 self.metrics.messages_received_total.increment(1);
218 }
219 msg
220 }
221
222 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
224 let msg = self.receiver.try_recv()?;
225 self.metrics.messages_received_total.increment(1);
226 Ok(msg)
227 }
228
229 pub fn close(&mut self) {
231 self.receiver.close();
232 }
233
234 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
236 let msg = ready!(self.receiver.poll_recv(cx));
237 if msg.is_some() {
238 self.metrics.messages_received_total.increment(1);
239 }
240 Poll::Ready(msg)
241 }
242}
243
244impl<T> Stream for MeteredReceiver<T> {
245 type Item = T;
246
247 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
248 self.poll_recv(cx)
249 }
250}
251
252#[derive(Clone, Metrics)]
254#[metrics(dynamic = true)]
255struct MeteredSenderMetrics {
256 messages_sent_total: Counter,
258 send_errors_total: Counter,
260}
261
262#[derive(Clone, Metrics)]
264#[metrics(dynamic = true)]
265struct MeteredReceiverMetrics {
266 messages_received_total: Counter,
268}
269
270#[derive(Debug)]
272pub struct MeteredPollSender<T> {
273 sender: PollSender<T>,
275 metrics: MeteredPollSenderMetrics,
277}
278
279impl<T: Send + 'static> MeteredPollSender<T> {
280 pub fn new(sender: PollSender<T>, scope: &'static str) -> Self {
282 Self { sender, metrics: MeteredPollSenderMetrics::new(scope) }
283 }
284
285 pub const fn inner(&self) -> &PollSender<T> {
287 &self.sender
288 }
289
290 pub fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), PollSendError<T>>> {
293 match self.sender.poll_reserve(cx) {
294 Poll::Ready(Ok(permit)) => Poll::Ready(Ok(permit)),
295 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
296 Poll::Pending => {
297 self.metrics.back_pressure_total.increment(1);
298 Poll::Pending
299 }
300 }
301 }
302
303 pub fn send_item(&mut self, item: T) -> Result<(), PollSendError<T>> {
306 match self.sender.send_item(item) {
307 Ok(()) => {
308 self.metrics.messages_sent_total.increment(1);
309 Ok(())
310 }
311 Err(error) => Err(error),
312 }
313 }
314}
315
316impl<T> Clone for MeteredPollSender<T> {
317 fn clone(&self) -> Self {
318 Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
319 }
320}
321
322#[derive(Clone, Metrics)]
324#[metrics(dynamic = true)]
325struct MeteredPollSenderMetrics {
326 messages_sent_total: Counter,
328 back_pressure_total: Counter,
330}