reth_metrics/common/
mpsc.rs

1//! Support for metering senders. Facilitates debugging by exposing metrics for number of messages
2//! sent, number of errors, etc.
3
4use crate::Metrics;
5use futures::Stream;
6use metrics::Counter;
7use std::{
8    pin::Pin,
9    task::{ready, Context, Poll},
10};
11use tokio::sync::mpsc::{
12    self,
13    error::{SendError, TryRecvError, TrySendError},
14    OwnedPermit,
15};
16use tokio_util::sync::{PollSendError, PollSender};
17
18/// Wrapper around [`mpsc::unbounded_channel`] that returns a new unbounded metered channel.
19pub fn metered_unbounded_channel<T>(
20    scope: &'static str,
21) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
22    let (tx, rx) = mpsc::unbounded_channel();
23    (UnboundedMeteredSender::new(tx, scope), UnboundedMeteredReceiver::new(rx, scope))
24}
25
26/// Wrapper around [`mpsc::channel`] that returns a new bounded metered channel with the given
27/// buffer size.
28pub fn metered_channel<T>(
29    buffer: usize,
30    scope: &'static str,
31) -> (MeteredSender<T>, MeteredReceiver<T>) {
32    let (tx, rx) = mpsc::channel(buffer);
33    (MeteredSender::new(tx, scope), MeteredReceiver::new(rx, scope))
34}
35
36/// A wrapper type around [`UnboundedSender`](mpsc::UnboundedSender) that updates metrics on send.
37#[derive(Debug)]
38pub struct UnboundedMeteredSender<T> {
39    /// The [`UnboundedSender`](mpsc::UnboundedSender) that this wraps around
40    sender: mpsc::UnboundedSender<T>,
41    /// Holds metrics for this type
42    metrics: MeteredSenderMetrics,
43}
44
45impl<T> UnboundedMeteredSender<T> {
46    /// Creates a new [`MeteredSender`] wrapping around the provided  that updates metrics on send.
47    // #[derive(Debug)]
48    pub fn new(sender: mpsc::UnboundedSender<T>, scope: &'static str) -> Self {
49        Self { sender, metrics: MeteredSenderMetrics::new(scope) }
50    }
51
52    /// Calls the underlying  that updates metrics on send.
53    // #[derive(Debug)]'s `try_send`, incrementing the appropriate
54    /// metrics depending on the result.
55    pub fn send(&self, message: T) -> Result<(), SendError<T>> {
56        match self.sender.send(message) {
57            Ok(()) => {
58                self.metrics.messages_sent_total.increment(1);
59                Ok(())
60            }
61            Err(error) => {
62                self.metrics.send_errors_total.increment(1);
63                Err(error)
64            }
65        }
66    }
67}
68
69impl<T> Clone for UnboundedMeteredSender<T> {
70    fn clone(&self) -> Self {
71        Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
72    }
73}
74
75/// A wrapper type around [Receiver](mpsc::UnboundedReceiver) that updates metrics on receive.
76#[derive(Debug)]
77pub struct UnboundedMeteredReceiver<T> {
78    /// The [Sender](mpsc::Sender) that this wraps around
79    receiver: mpsc::UnboundedReceiver<T>,
80    /// Holds metrics for this type
81    metrics: MeteredReceiverMetrics,
82}
83
84// === impl MeteredReceiver ===
85
86impl<T> UnboundedMeteredReceiver<T> {
87    /// Creates a new [`UnboundedMeteredReceiver`] wrapping around the provided
88    /// [Receiver](mpsc::UnboundedReceiver)
89    pub fn new(receiver: mpsc::UnboundedReceiver<T>, scope: &'static str) -> Self {
90        Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
91    }
92
93    /// Receives the next value for this receiver.
94    pub async fn recv(&mut self) -> Option<T> {
95        let msg = self.receiver.recv().await;
96        if msg.is_some() {
97            self.metrics.messages_received_total.increment(1);
98        }
99        msg
100    }
101
102    /// Tries to receive the next value for this receiver.
103    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
104        let msg = self.receiver.try_recv()?;
105        self.metrics.messages_received_total.increment(1);
106        Ok(msg)
107    }
108
109    /// Closes the receiving half of a channel without dropping it.
110    pub fn close(&mut self) {
111        self.receiver.close();
112    }
113
114    /// Polls to receive the next message on this channel.
115    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
116        let msg = ready!(self.receiver.poll_recv(cx));
117        if msg.is_some() {
118            self.metrics.messages_received_total.increment(1);
119        }
120        Poll::Ready(msg)
121    }
122}
123
124impl<T> Stream for UnboundedMeteredReceiver<T> {
125    type Item = T;
126
127    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128        self.poll_recv(cx)
129    }
130}
131
132/// A wrapper type around [Sender](mpsc::Sender) that updates metrics on send.
133#[derive(Debug)]
134pub struct MeteredSender<T> {
135    /// The [Sender](mpsc::Sender) that this wraps around
136    sender: mpsc::Sender<T>,
137    /// Holds metrics for this type
138    metrics: MeteredSenderMetrics,
139}
140
141impl<T> MeteredSender<T> {
142    /// Creates a new [`MeteredSender`] wrapping around the provided [Sender](mpsc::Sender)
143    pub fn new(sender: mpsc::Sender<T>, scope: &'static str) -> Self {
144        Self { sender, metrics: MeteredSenderMetrics::new(scope) }
145    }
146
147    /// Tries to acquire a permit to send a message.
148    ///
149    /// See also [Sender](mpsc::Sender)'s `try_reserve_owned`.
150    pub fn try_reserve_owned(&self) -> Result<OwnedPermit<T>, TrySendError<mpsc::Sender<T>>> {
151        self.sender.clone().try_reserve_owned()
152    }
153
154    /// Returns the underlying [Sender](mpsc::Sender).
155    pub const fn inner(&self) -> &mpsc::Sender<T> {
156        &self.sender
157    }
158
159    /// Calls the underlying [Sender](mpsc::Sender)'s `try_send`, incrementing the appropriate
160    /// metrics depending on the result.
161    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
162        match self.sender.try_send(message) {
163            Ok(()) => {
164                self.metrics.messages_sent_total.increment(1);
165                Ok(())
166            }
167            Err(error) => {
168                self.metrics.send_errors_total.increment(1);
169                Err(error)
170            }
171        }
172    }
173
174    /// Calls the underlying [Sender](mpsc::Sender)'s `send`, incrementing the appropriate
175    /// metrics depending on the result.
176    pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
177        match self.sender.send(value).await {
178            Ok(()) => {
179                self.metrics.messages_sent_total.increment(1);
180                Ok(())
181            }
182            Err(error) => {
183                self.metrics.send_errors_total.increment(1);
184                Err(error)
185            }
186        }
187    }
188}
189
190impl<T> Clone for MeteredSender<T> {
191    fn clone(&self) -> Self {
192        Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
193    }
194}
195
196/// A wrapper type around [Receiver](mpsc::Receiver) that updates metrics on receive.
197#[derive(Debug)]
198pub struct MeteredReceiver<T> {
199    /// The [Sender](mpsc::Sender) that this wraps around
200    receiver: mpsc::Receiver<T>,
201    /// Holds metrics for this type
202    metrics: MeteredReceiverMetrics,
203}
204
205// === impl MeteredReceiver ===
206
207impl<T> MeteredReceiver<T> {
208    /// Creates a new [`MeteredReceiver`] wrapping around the provided [Receiver](mpsc::Receiver)
209    pub fn new(receiver: mpsc::Receiver<T>, scope: &'static str) -> Self {
210        Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
211    }
212
213    /// Receives the next value for this receiver.
214    pub async fn recv(&mut self) -> Option<T> {
215        let msg = self.receiver.recv().await;
216        if msg.is_some() {
217            self.metrics.messages_received_total.increment(1);
218        }
219        msg
220    }
221
222    /// Tries to receive the next value for this receiver.
223    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
224        let msg = self.receiver.try_recv()?;
225        self.metrics.messages_received_total.increment(1);
226        Ok(msg)
227    }
228
229    /// Closes the receiving half of a channel without dropping it.
230    pub fn close(&mut self) {
231        self.receiver.close();
232    }
233
234    /// Polls to receive the next message on this channel.
235    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
236        let msg = ready!(self.receiver.poll_recv(cx));
237        if msg.is_some() {
238            self.metrics.messages_received_total.increment(1);
239        }
240        Poll::Ready(msg)
241    }
242}
243
244impl<T> Stream for MeteredReceiver<T> {
245    type Item = T;
246
247    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
248        self.poll_recv(cx)
249    }
250}
251
252/// Throughput metrics for [`MeteredSender`]
253#[derive(Clone, Metrics)]
254#[metrics(dynamic = true)]
255struct MeteredSenderMetrics {
256    /// Number of messages sent
257    messages_sent_total: Counter,
258    /// Number of failed message deliveries
259    send_errors_total: Counter,
260}
261
262/// Throughput metrics for [`MeteredReceiver`]
263#[derive(Clone, Metrics)]
264#[metrics(dynamic = true)]
265struct MeteredReceiverMetrics {
266    /// Number of messages received
267    messages_received_total: Counter,
268}
269
270/// A wrapper type around [`PollSender`] that updates metrics on send.
271#[derive(Debug)]
272pub struct MeteredPollSender<T> {
273    /// The [`PollSender`] that this wraps around.
274    sender: PollSender<T>,
275    /// Holds metrics for this type.
276    metrics: MeteredPollSenderMetrics,
277}
278
279impl<T: Send + 'static> MeteredPollSender<T> {
280    /// Creates a new [`MeteredPollSender`] wrapping around the provided [`PollSender`].
281    pub fn new(sender: PollSender<T>, scope: &'static str) -> Self {
282        Self { sender, metrics: MeteredPollSenderMetrics::new(scope) }
283    }
284
285    /// Returns the underlying [`PollSender`].
286    pub const fn inner(&self) -> &PollSender<T> {
287        &self.sender
288    }
289
290    /// Calls the underlying [`PollSender`]'s `poll_reserve`, incrementing the appropriate
291    /// metrics depending on the result.
292    pub fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), PollSendError<T>>> {
293        match self.sender.poll_reserve(cx) {
294            Poll::Ready(Ok(permit)) => Poll::Ready(Ok(permit)),
295            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
296            Poll::Pending => {
297                self.metrics.back_pressure_total.increment(1);
298                Poll::Pending
299            }
300        }
301    }
302
303    /// Calls the underlying [`PollSender`]'s `send_item`, incrementing the appropriate
304    /// metrics depending on the result.
305    pub fn send_item(&mut self, item: T) -> Result<(), PollSendError<T>> {
306        match self.sender.send_item(item) {
307            Ok(()) => {
308                self.metrics.messages_sent_total.increment(1);
309                Ok(())
310            }
311            Err(error) => Err(error),
312        }
313    }
314}
315
316impl<T> Clone for MeteredPollSender<T> {
317    fn clone(&self) -> Self {
318        Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
319    }
320}
321
322/// Throughput metrics for [`MeteredPollSender`]
323#[derive(Clone, Metrics)]
324#[metrics(dynamic = true)]
325struct MeteredPollSenderMetrics {
326    /// Number of messages sent
327    messages_sent_total: Counter,
328    /// Number of delayed message deliveries caused by a full channel
329    back_pressure_total: Counter,
330}