1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use crate::EventStream;
use tokio::sync::broadcast::{self, Sender};
use tracing::trace;

const DEFAULT_SIZE_BROADCAST_CHANNEL: usize = 2000;

/// A bounded broadcast channel for a task.
#[derive(Debug, Clone)]
pub struct EventSender<T> {
    /// The sender part of the broadcast channel
    sender: Sender<T>,
}

impl<T> Default for EventSender<T>
where
    T: Clone + Send + Sync + 'static,
{
    fn default() -> Self {
        Self::new(DEFAULT_SIZE_BROADCAST_CHANNEL)
    }
}

impl<T: Clone + Send + Sync + 'static> EventSender<T> {
    /// Creates a new `EventSender`.
    pub fn new(events_channel_size: usize) -> Self {
        let (sender, _) = broadcast::channel(events_channel_size);
        Self { sender }
    }

    /// Broadcasts an event to all listeners.
    pub fn notify(&self, event: T) {
        if self.sender.send(event).is_err() {
            trace!("no receivers for broadcast events");
        }
    }

    /// Creates a new event stream with a subscriber to the sender as the
    /// receiver.
    pub fn new_listener(&self) -> EventStream<T> {
        EventStream::new(self.sender.subscribe())
    }
}