reth_tokio_util/
event_sender.rs
1use crate::EventStream;
2use tokio::sync::broadcast::{self, Sender};
3use tracing::trace;
4
5const DEFAULT_SIZE_BROADCAST_CHANNEL: usize = 2000;
6
7#[derive(Debug)]
9pub struct EventSender<T> {
10 sender: Sender<T>,
12}
13
14impl<T> Default for EventSender<T>
15where
16 T: Clone + Send + Sync + 'static,
17{
18 fn default() -> Self {
19 Self::new(DEFAULT_SIZE_BROADCAST_CHANNEL)
20 }
21}
22
23impl<T> Clone for EventSender<T> {
24 fn clone(&self) -> Self {
25 Self { sender: self.sender.clone() }
26 }
27}
28
29impl<T: Clone + Send + Sync + 'static> EventSender<T> {
30 pub fn new(events_channel_size: usize) -> Self {
32 let (sender, _) = broadcast::channel(events_channel_size);
33 Self { sender }
34 }
35
36 pub fn notify(&self, event: T) {
38 if self.sender.send(event).is_err() {
39 trace!("no receivers for broadcast events");
40 }
41 }
42
43 pub fn new_listener(&self) -> EventStream<T> {
46 EventStream::new(self.sender.subscribe())
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use super::*;
53 use tokio::{
54 task,
55 time::{timeout, Duration},
56 };
57 use tokio_stream::StreamExt;
58
59 #[tokio::test]
60 async fn test_event_broadcast_to_listener() {
61 let sender = EventSender::default();
62
63 let mut listener = sender.new_listener();
65
66 sender.notify("event1");
68
69 let received_event = listener.next().await;
71 assert_eq!(received_event, Some("event1"));
72 }
73
74 #[tokio::test]
75 async fn test_event_no_listener() {
76 let sender = EventSender::default();
77
78 sender.notify("event2");
80
81 }
84
85 #[tokio::test]
86 async fn test_multiple_listeners_receive_event() {
87 let sender = EventSender::default();
88
89 let mut listener1 = sender.new_listener();
91 let mut listener2 = sender.new_listener();
92
93 sender.notify("event3");
95
96 let event1 = listener1.next().await;
98 let event2 = listener2.next().await;
99
100 assert_eq!(event1, Some("event3"));
101 assert_eq!(event2, Some("event3"));
102 }
103
104 #[tokio::test]
105 async fn test_bounded_channel_size() {
106 let sender = EventSender::new(2);
108
109 let mut listener = sender.new_listener();
111
112 sender.notify("event4");
114 sender.notify("event5");
115 sender.notify("event6");
116
117 let received_event1 = listener.next().await;
119 let received_event2 = listener.next().await;
120
121 assert_eq!(received_event1, Some("event5"));
122 assert_eq!(received_event2, Some("event6"));
123 }
124
125 #[tokio::test]
126 async fn test_event_listener_timeout() {
127 let sender = EventSender::default();
128 let mut listener = sender.new_listener();
129
130 task::spawn(async move {
132 tokio::time::sleep(Duration::from_millis(50)).await;
133 sender.notify("delayed_event");
134 });
135
136 let result = timeout(Duration::from_millis(100), listener.next()).await;
138 assert!(result.is_ok());
139 assert_eq!(result.unwrap(), Some("delayed_event"));
140 }
141}