reth_tokio_util/
event_sender.rsuse crate::EventStream;
use tokio::sync::broadcast::{self, Sender};
use tracing::trace;
const DEFAULT_SIZE_BROADCAST_CHANNEL: usize = 2000;
#[derive(Debug, Clone)]
pub struct EventSender<T> {
sender: Sender<T>,
}
impl<T> Default for EventSender<T>
where
T: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self::new(DEFAULT_SIZE_BROADCAST_CHANNEL)
}
}
impl<T: Clone + Send + Sync + 'static> EventSender<T> {
pub fn new(events_channel_size: usize) -> Self {
let (sender, _) = broadcast::channel(events_channel_size);
Self { sender }
}
pub fn notify(&self, event: T) {
if self.sender.send(event).is_err() {
trace!("no receivers for broadcast events");
}
}
pub fn new_listener(&self) -> EventStream<T> {
EventStream::new(self.sender.subscribe())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::{
task,
time::{timeout, Duration},
};
use tokio_stream::StreamExt;
#[tokio::test]
async fn test_event_broadcast_to_listener() {
let sender = EventSender::default();
let mut listener = sender.new_listener();
sender.notify("event1");
let received_event = listener.next().await;
assert_eq!(received_event, Some("event1"));
}
#[tokio::test]
async fn test_event_no_listener() {
let sender = EventSender::default();
sender.notify("event2");
}
#[tokio::test]
async fn test_multiple_listeners_receive_event() {
let sender = EventSender::default();
let mut listener1 = sender.new_listener();
let mut listener2 = sender.new_listener();
sender.notify("event3");
let event1 = listener1.next().await;
let event2 = listener2.next().await;
assert_eq!(event1, Some("event3"));
assert_eq!(event2, Some("event3"));
}
#[tokio::test]
async fn test_bounded_channel_size() {
let sender = EventSender::new(2);
let mut listener = sender.new_listener();
sender.notify("event4");
sender.notify("event5");
sender.notify("event6");
let received_event1 = listener.next().await;
let received_event2 = listener.next().await;
assert_eq!(received_event1, Some("event5"));
assert_eq!(received_event2, Some("event6"));
}
#[tokio::test]
async fn test_event_listener_timeout() {
let sender = EventSender::default();
let mut listener = sender.new_listener();
task::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
sender.notify("delayed_event");
});
let result = timeout(Duration::from_millis(100), listener.next()).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some("delayed_event"));
}
}