reth_metrics/common/
mpsc.rs

1//! Support for metering senders. Facilitates debugging by exposing metrics for number of messages
2//! sent, number of errors, etc.
3
4use crate::Metrics;
5use futures::Stream;
6use metrics::Counter;
7use std::{
8    pin::Pin,
9    task::{ready, Context, Poll},
10};
11use tokio::sync::mpsc::{
12    self,
13    error::{SendError, TryRecvError, TrySendError},
14};
15use tokio_util::sync::{PollSendError, PollSender};
16
17/// Wrapper around [`mpsc::unbounded_channel`] that returns a new unbounded metered channel.
18pub fn metered_unbounded_channel<T>(
19    scope: &'static str,
20) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
21    let (tx, rx) = mpsc::unbounded_channel();
22    (UnboundedMeteredSender::new(tx, scope), UnboundedMeteredReceiver::new(rx, scope))
23}
24
25/// Wrapper around [`mpsc::channel`] that returns a new bounded metered channel with the given
26/// buffer size.
27pub fn metered_channel<T>(
28    buffer: usize,
29    scope: &'static str,
30) -> (MeteredSender<T>, MeteredReceiver<T>) {
31    let (tx, rx) = mpsc::channel(buffer);
32    (MeteredSender::new(tx, scope), MeteredReceiver::new(rx, scope))
33}
34
35/// A wrapper type around [`UnboundedSender`](mpsc::UnboundedSender) that updates metrics on send.
36#[derive(Debug)]
37pub struct UnboundedMeteredSender<T> {
38    /// The [`UnboundedSender`](mpsc::UnboundedSender) that this wraps around
39    sender: mpsc::UnboundedSender<T>,
40    /// Holds metrics for this type
41    metrics: MeteredSenderMetrics,
42}
43
44impl<T> UnboundedMeteredSender<T> {
45    /// Creates a new [`MeteredSender`] wrapping around the provided  that updates metrics on send.
46    // #[derive(Debug)]
47    pub fn new(sender: mpsc::UnboundedSender<T>, scope: &'static str) -> Self {
48        Self { sender, metrics: MeteredSenderMetrics::new(scope) }
49    }
50
51    /// Calls the underlying  that updates metrics on send.
52    // #[derive(Debug)]'s `try_send`, incrementing the appropriate
53    /// metrics depending on the result.
54    pub fn send(&self, message: T) -> Result<(), SendError<T>> {
55        match self.sender.send(message) {
56            Ok(()) => {
57                self.metrics.messages_sent_total.increment(1);
58                Ok(())
59            }
60            Err(error) => {
61                self.metrics.send_errors_total.increment(1);
62                Err(error)
63            }
64        }
65    }
66}
67
68impl<T> Clone for UnboundedMeteredSender<T> {
69    fn clone(&self) -> Self {
70        Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
71    }
72}
73
74/// A wrapper type around [Receiver](mpsc::UnboundedReceiver) that updates metrics on receive.
75#[derive(Debug)]
76pub struct UnboundedMeteredReceiver<T> {
77    /// The [Sender](mpsc::Sender) that this wraps around
78    receiver: mpsc::UnboundedReceiver<T>,
79    /// Holds metrics for this type
80    metrics: MeteredReceiverMetrics,
81}
82
83// === impl MeteredReceiver ===
84
85impl<T> UnboundedMeteredReceiver<T> {
86    /// Creates a new [`UnboundedMeteredReceiver`] wrapping around the provided
87    /// [Receiver](mpsc::UnboundedReceiver)
88    pub fn new(receiver: mpsc::UnboundedReceiver<T>, scope: &'static str) -> Self {
89        Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
90    }
91
92    /// Receives the next value for this receiver.
93    pub async fn recv(&mut self) -> Option<T> {
94        let msg = self.receiver.recv().await;
95        if msg.is_some() {
96            self.metrics.messages_received_total.increment(1);
97        }
98        msg
99    }
100
101    /// Tries to receive the next value for this receiver.
102    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
103        let msg = self.receiver.try_recv()?;
104        self.metrics.messages_received_total.increment(1);
105        Ok(msg)
106    }
107
108    /// Closes the receiving half of a channel without dropping it.
109    pub fn close(&mut self) {
110        self.receiver.close();
111    }
112
113    /// Polls to receive the next message on this channel.
114    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
115        let msg = ready!(self.receiver.poll_recv(cx));
116        if msg.is_some() {
117            self.metrics.messages_received_total.increment(1);
118        }
119        Poll::Ready(msg)
120    }
121}
122
123impl<T> Stream for UnboundedMeteredReceiver<T> {
124    type Item = T;
125
126    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        self.poll_recv(cx)
128    }
129}
130
131/// A wrapper type around [Sender](mpsc::Sender) that updates metrics on send.
132#[derive(Debug)]
133pub struct MeteredSender<T> {
134    /// The [Sender](mpsc::Sender) that this wraps around
135    sender: mpsc::Sender<T>,
136    /// Holds metrics for this type
137    metrics: MeteredSenderMetrics,
138}
139
140impl<T> MeteredSender<T> {
141    /// Creates a new [`MeteredSender`] wrapping around the provided [Sender](mpsc::Sender)
142    pub fn new(sender: mpsc::Sender<T>, scope: &'static str) -> Self {
143        Self { sender, metrics: MeteredSenderMetrics::new(scope) }
144    }
145
146    /// Tries to acquire a permit to send a message without waiting.
147    ///
148    /// See also [Sender](mpsc::Sender)'s `try_reserve_owned`.
149    pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
150        let Self { sender, metrics } = self;
151        sender.try_reserve_owned().map(|permit| OwnedPermit::new(permit, metrics.clone())).map_err(
152            |err| match err {
153                TrySendError::Full(sender) => TrySendError::Full(Self { sender, metrics }),
154                TrySendError::Closed(sender) => TrySendError::Closed(Self { sender, metrics }),
155            },
156        )
157    }
158
159    /// Waits to acquire a permit to send a message and return owned permit.
160    ///
161    /// See also [Sender](mpsc::Sender)'s `reserve_owned`.
162    pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
163        self.sender.reserve_owned().await.map(|permit| OwnedPermit::new(permit, self.metrics))
164    }
165
166    /// Waits to acquire a permit to send a message.
167    ///
168    /// See also [Sender](mpsc::Sender)'s `reserve`.
169    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
170        self.sender.reserve().await.map(|permit| Permit::new(permit, &self.metrics))
171    }
172
173    /// Tries to acquire a permit to send a message without waiting.
174    ///
175    /// See also [Sender](mpsc::Sender)'s `try_reserve`.
176    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
177        self.sender.try_reserve().map(|permit| Permit::new(permit, &self.metrics))
178    }
179
180    /// Returns the underlying [Sender](mpsc::Sender).
181    pub const fn inner(&self) -> &mpsc::Sender<T> {
182        &self.sender
183    }
184
185    /// Calls the underlying [Sender](mpsc::Sender)'s `try_send`, incrementing the appropriate
186    /// metrics depending on the result.
187    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
188        match self.sender.try_send(message) {
189            Ok(()) => {
190                self.metrics.messages_sent_total.increment(1);
191                Ok(())
192            }
193            Err(error) => {
194                self.metrics.send_errors_total.increment(1);
195                Err(error)
196            }
197        }
198    }
199
200    /// Calls the underlying [Sender](mpsc::Sender)'s `send`, incrementing the appropriate
201    /// metrics depending on the result.
202    pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
203        match self.sender.send(value).await {
204            Ok(()) => {
205                self.metrics.messages_sent_total.increment(1);
206                Ok(())
207            }
208            Err(error) => {
209                self.metrics.send_errors_total.increment(1);
210                Err(error)
211            }
212        }
213    }
214}
215
216impl<T> Clone for MeteredSender<T> {
217    fn clone(&self) -> Self {
218        Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
219    }
220}
221
222/// A wrapper type around [`OwnedPermit`](mpsc::OwnedPermit) that updates metrics accounting
223/// when sending
224#[derive(Debug)]
225pub struct OwnedPermit<T> {
226    permit: mpsc::OwnedPermit<T>,
227    /// Holds metrics for this type
228    metrics: MeteredSenderMetrics,
229}
230
231impl<T> OwnedPermit<T> {
232    /// Creates a new [`OwnedPermit`] wrapping the provided [`mpsc::OwnedPermit`] with given metrics
233    /// handle.
234    pub const fn new(permit: mpsc::OwnedPermit<T>, metrics: MeteredSenderMetrics) -> Self {
235        Self { permit, metrics }
236    }
237
238    /// Sends a value using the reserved capacity and update metrics accordingly.
239    pub fn send(self, value: T) -> MeteredSender<T> {
240        let Self { permit, metrics } = self;
241        metrics.messages_sent_total.increment(1);
242        MeteredSender { sender: permit.send(value), metrics }
243    }
244}
245
246/// A wrapper type around [Permit](mpsc::Permit) that updates metrics accounting
247/// when sending
248#[derive(Debug)]
249pub struct Permit<'a, T> {
250    permit: mpsc::Permit<'a, T>,
251    metrics_ref: &'a MeteredSenderMetrics,
252}
253
254impl<'a, T> Permit<'a, T> {
255    /// Creates a new [`Permit`] wrapping the provided [`mpsc::Permit`] with given metrics ref.
256    pub const fn new(permit: mpsc::Permit<'a, T>, metrics_ref: &'a MeteredSenderMetrics) -> Self {
257        Self { permit, metrics_ref }
258    }
259
260    /// Sends a value using the reserved capacity and updates metrics accordingly.
261    pub fn send(self, value: T) {
262        self.metrics_ref.messages_sent_total.increment(1);
263        self.permit.send(value);
264    }
265}
266
267/// A wrapper type around [Receiver](mpsc::Receiver) that updates metrics on receive.
268#[derive(Debug)]
269pub struct MeteredReceiver<T> {
270    /// The [Receiver](mpsc::Receiver) that this wraps around
271    receiver: mpsc::Receiver<T>,
272    /// Holds metrics for this type
273    metrics: MeteredReceiverMetrics,
274}
275
276// === impl MeteredReceiver ===
277
278impl<T> MeteredReceiver<T> {
279    /// Creates a new [`MeteredReceiver`] wrapping around the provided [Receiver](mpsc::Receiver)
280    pub fn new(receiver: mpsc::Receiver<T>, scope: &'static str) -> Self {
281        Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
282    }
283
284    /// Receives the next value for this receiver.
285    pub async fn recv(&mut self) -> Option<T> {
286        let msg = self.receiver.recv().await;
287        if msg.is_some() {
288            self.metrics.messages_received_total.increment(1);
289        }
290        msg
291    }
292
293    /// Tries to receive the next value for this receiver.
294    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
295        let msg = self.receiver.try_recv()?;
296        self.metrics.messages_received_total.increment(1);
297        Ok(msg)
298    }
299
300    /// Closes the receiving half of a channel without dropping it.
301    pub fn close(&mut self) {
302        self.receiver.close();
303    }
304
305    /// Polls to receive the next message on this channel.
306    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
307        let msg = ready!(self.receiver.poll_recv(cx));
308        if msg.is_some() {
309            self.metrics.messages_received_total.increment(1);
310        }
311        Poll::Ready(msg)
312    }
313}
314
315impl<T> Stream for MeteredReceiver<T> {
316    type Item = T;
317
318    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
319        self.poll_recv(cx)
320    }
321}
322
323/// Throughput metrics for [`MeteredSender`]
324#[derive(Clone, Metrics)]
325#[metrics(dynamic = true)]
326pub struct MeteredSenderMetrics {
327    /// Number of messages sent
328    messages_sent_total: Counter,
329    /// Number of failed message deliveries
330    send_errors_total: Counter,
331}
332
333/// Throughput metrics for [`MeteredReceiver`]
334#[derive(Clone, Metrics)]
335#[metrics(dynamic = true)]
336struct MeteredReceiverMetrics {
337    /// Number of messages received
338    messages_received_total: Counter,
339}
340
341/// A wrapper type around [`PollSender`] that updates metrics on send.
342#[derive(Debug)]
343pub struct MeteredPollSender<T> {
344    /// The [`PollSender`] that this wraps around.
345    sender: PollSender<T>,
346    /// Holds metrics for this type.
347    metrics: MeteredPollSenderMetrics,
348}
349
350impl<T: Send + 'static> MeteredPollSender<T> {
351    /// Creates a new [`MeteredPollSender`] wrapping around the provided [`PollSender`].
352    pub fn new(sender: PollSender<T>, scope: &'static str) -> Self {
353        Self { sender, metrics: MeteredPollSenderMetrics::new(scope) }
354    }
355
356    /// Returns the underlying [`PollSender`].
357    pub const fn inner(&self) -> &PollSender<T> {
358        &self.sender
359    }
360
361    /// Calls the underlying [`PollSender`]'s `poll_reserve`, incrementing the appropriate
362    /// metrics depending on the result.
363    pub fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), PollSendError<T>>> {
364        match self.sender.poll_reserve(cx) {
365            Poll::Ready(Ok(permit)) => Poll::Ready(Ok(permit)),
366            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
367            Poll::Pending => {
368                self.metrics.back_pressure_total.increment(1);
369                Poll::Pending
370            }
371        }
372    }
373
374    /// Calls the underlying [`PollSender`]'s `send_item`, incrementing the appropriate
375    /// metrics depending on the result.
376    pub fn send_item(&mut self, item: T) -> Result<(), PollSendError<T>> {
377        match self.sender.send_item(item) {
378            Ok(()) => {
379                self.metrics.messages_sent_total.increment(1);
380                Ok(())
381            }
382            Err(error) => Err(error),
383        }
384    }
385}
386
387impl<T> Clone for MeteredPollSender<T> {
388    fn clone(&self) -> Self {
389        Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
390    }
391}
392
393/// Throughput metrics for [`MeteredPollSender`]
394#[derive(Clone, Metrics)]
395#[metrics(dynamic = true)]
396struct MeteredPollSenderMetrics {
397    /// Number of messages sent
398    messages_sent_total: Counter,
399    /// Number of delayed message deliveries caused by a full channel
400    back_pressure_total: Counter,
401}