reth_transaction_pool/pool/
listener.rs
use crate::{
pool::events::{FullTransactionEvent, TransactionEvent},
traits::PropagateKind,
PoolTransaction, ValidPoolTransaction,
};
use alloy_primitives::{TxHash, B256};
use futures_util::Stream;
use std::{
collections::{hash_map::Entry, HashMap},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::mpsc::{
error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TransactionEvents {
hash: TxHash,
events: UnboundedReceiver<TransactionEvent>,
}
impl TransactionEvents {
pub const fn hash(&self) -> TxHash {
self.hash
}
}
impl Stream for TransactionEvents {
type Item = TransactionEvent;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.get_mut().events.poll_recv(cx)
}
}
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct AllTransactionsEvents<T: PoolTransaction> {
pub(crate) events: Receiver<FullTransactionEvent<T>>,
}
impl<T: PoolTransaction> AllTransactionsEvents<T> {
pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
Self { events }
}
}
impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
type Item = FullTransactionEvent<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().events.poll_recv(cx)
}
}
#[derive(Debug)]
pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
all_events_broadcaster: AllPoolEventsBroadcaster<T>,
broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
}
impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
fn default() -> Self {
Self {
all_events_broadcaster: AllPoolEventsBroadcaster::default(),
broadcasters_by_hash: HashMap::default(),
}
}
}
impl<T: PoolTransaction> PoolEventBroadcast<T> {
fn broadcast_event(
&mut self,
hash: &TxHash,
event: TransactionEvent,
pool_event: FullTransactionEvent<T>,
) {
if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
sink.get_mut().broadcast(event.clone());
if sink.get().is_empty() || event.is_final() {
sink.remove();
}
}
self.all_events_broadcaster.broadcast(pool_event);
}
pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
match self.broadcasters_by_hash.entry(tx_hash) {
Entry::Occupied(mut entry) => {
entry.get_mut().senders.push(tx);
}
Entry::Vacant(entry) => {
entry.insert(PoolEventBroadcaster { senders: vec![tx] });
}
};
TransactionEvents { hash: tx_hash, events: rx }
}
pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
self.all_events_broadcaster.senders.push(tx);
AllTransactionsEvents::new(rx)
}
pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
if let Some(replaced) = replaced {
self.replaced(replaced, *tx);
}
}
pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
let transaction = Arc::clone(&tx);
self.broadcast_event(
tx.hash(),
TransactionEvent::Replaced(replaced_by),
FullTransactionEvent::Replaced { transaction, replaced_by },
);
}
pub(crate) fn queued(&mut self, tx: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
}
pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
let peers = Arc::new(peers);
self.broadcast_event(
tx,
TransactionEvent::Propagated(Arc::clone(&peers)),
FullTransactionEvent::Propagated(peers),
);
}
pub(crate) fn discarded(&mut self, tx: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
}
pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
self.broadcast_event(
tx,
TransactionEvent::Mined(block_hash),
FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
);
}
}
#[derive(Debug)]
struct AllPoolEventsBroadcaster<T: PoolTransaction> {
senders: Vec<Sender<FullTransactionEvent<T>>>,
}
impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
fn default() -> Self {
Self { senders: Vec::new() }
}
}
impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
fn broadcast(&mut self, event: FullTransactionEvent<T>) {
self.senders.retain(|sender| match sender.try_send(event.clone()) {
Ok(_) | Err(TrySendError::Full(_)) => true,
Err(TrySendError::Closed(_)) => false,
})
}
}
#[derive(Default, Debug)]
struct PoolEventBroadcaster {
senders: Vec<UnboundedSender<TransactionEvent>>,
}
impl PoolEventBroadcaster {
fn is_empty(&self) -> bool {
self.senders.is_empty()
}
fn broadcast(&mut self, event: TransactionEvent) {
self.senders.retain(|sender| sender.send(event.clone()).is_ok())
}
}