reth_tokio_util/
event_sender.rs

1use crate::EventStream;
2use tokio::sync::broadcast::{self, Sender};
3use tracing::trace;
4
5const DEFAULT_SIZE_BROADCAST_CHANNEL: usize = 2000;
6
7/// A bounded multi-producer, multi-consumer broadcast channel.
8#[derive(Debug)]
9pub struct EventSender<T> {
10    /// The sender part of the broadcast channel
11    sender: Sender<T>,
12}
13
14impl<T> Default for EventSender<T>
15where
16    T: Clone + Send + Sync + 'static,
17{
18    fn default() -> Self {
19        Self::new(DEFAULT_SIZE_BROADCAST_CHANNEL)
20    }
21}
22
23impl<T> Clone for EventSender<T> {
24    fn clone(&self) -> Self {
25        Self { sender: self.sender.clone() }
26    }
27}
28
29impl<T: Clone + Send + Sync + 'static> EventSender<T> {
30    /// Creates a new `EventSender`.
31    pub fn new(events_channel_size: usize) -> Self {
32        let (sender, _) = broadcast::channel(events_channel_size);
33        Self { sender }
34    }
35
36    /// Broadcasts an event to all listeners.
37    pub fn notify(&self, event: T) {
38        if self.sender.send(event).is_err() {
39            trace!("no receivers for broadcast events");
40        }
41    }
42
43    /// Creates a new event stream with a subscriber to the sender as the
44    /// receiver.
45    pub fn new_listener(&self) -> EventStream<T> {
46        EventStream::new(self.sender.subscribe())
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use super::*;
53    use tokio::{
54        task,
55        time::{timeout, Duration},
56    };
57    use tokio_stream::StreamExt;
58
59    #[tokio::test]
60    async fn test_event_broadcast_to_listener() {
61        let sender = EventSender::default();
62
63        // Create a listener for the events
64        let mut listener = sender.new_listener();
65
66        // Broadcast an event
67        sender.notify("event1");
68
69        // Check if the listener receives the event
70        let received_event = listener.next().await;
71        assert_eq!(received_event, Some("event1"));
72    }
73
74    #[tokio::test]
75    async fn test_event_no_listener() {
76        let sender = EventSender::default();
77
78        // Broadcast an event with no listeners
79        sender.notify("event2");
80
81        // Ensure it doesn't panic or fail when no listeners are present
82        // (this test passes if it runs without errors).
83    }
84
85    #[tokio::test]
86    async fn test_multiple_listeners_receive_event() {
87        let sender = EventSender::default();
88
89        // Create two listeners
90        let mut listener1 = sender.new_listener();
91        let mut listener2 = sender.new_listener();
92
93        // Broadcast an event
94        sender.notify("event3");
95
96        // Both listeners should receive the same event
97        let event1 = listener1.next().await;
98        let event2 = listener2.next().await;
99
100        assert_eq!(event1, Some("event3"));
101        assert_eq!(event2, Some("event3"));
102    }
103
104    #[tokio::test]
105    async fn test_bounded_channel_size() {
106        // Create a channel with size 2
107        let sender = EventSender::new(2);
108
109        // Create a listener
110        let mut listener = sender.new_listener();
111
112        // Broadcast 3 events, which exceeds the channel size
113        sender.notify("event4");
114        sender.notify("event5");
115        sender.notify("event6");
116
117        // Only the last two should be received due to the size limit
118        let received_event1 = listener.next().await;
119        let received_event2 = listener.next().await;
120
121        assert_eq!(received_event1, Some("event5"));
122        assert_eq!(received_event2, Some("event6"));
123    }
124
125    #[tokio::test]
126    async fn test_event_listener_timeout() {
127        let sender = EventSender::default();
128        let mut listener = sender.new_listener();
129
130        // Broadcast an event asynchronously
131        task::spawn(async move {
132            tokio::time::sleep(Duration::from_millis(50)).await;
133            sender.notify("delayed_event");
134        });
135
136        // Use a timeout to ensure that the event is received within a certain time
137        let result = timeout(Duration::from_millis(100), listener.next()).await;
138        assert!(result.is_ok());
139        assert_eq!(result.unwrap(), Some("delayed_event"));
140    }
141}