reth_transaction_pool/pool/
listener.rs

1//! Listeners for the transaction-pool
2
3use crate::{
4    pool::events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
5    traits::{NewBlobSidecar, PropagateKind},
6    PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11    collections::{hash_map::Entry, HashMap},
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17    self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19use tracing::debug;
20/// The size of the event channel used to propagate transaction events.
21const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23/// A Stream that receives [`TransactionEvent`] only for the transaction with the given hash.
24#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27    hash: TxHash,
28    events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32    /// Create a new instance of this stream.
33    pub const fn new(hash: TxHash, events: UnboundedReceiver<TransactionEvent>) -> Self {
34        Self { hash, events }
35    }
36
37    /// The hash for this transaction
38    pub const fn hash(&self) -> TxHash {
39        self.hash
40    }
41}
42
43impl Stream for TransactionEvents {
44    type Item = TransactionEvent;
45
46    fn poll_next(
47        self: std::pin::Pin<&mut Self>,
48        cx: &mut std::task::Context<'_>,
49    ) -> std::task::Poll<Option<Self::Item>> {
50        self.get_mut().events.poll_recv(cx)
51    }
52}
53
54/// A Stream that receives [`FullTransactionEvent`] for _all_ transaction.
55#[derive(Debug)]
56#[must_use = "streams do nothing unless polled"]
57pub struct AllTransactionsEvents<T: PoolTransaction> {
58    pub(crate) events: Receiver<FullTransactionEvent<T>>,
59}
60
61impl<T: PoolTransaction> AllTransactionsEvents<T> {
62    /// Create a new instance of this stream.
63    pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
64        Self { events }
65    }
66}
67
68impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
69    type Item = FullTransactionEvent<T>;
70
71    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72        self.get_mut().events.poll_recv(cx)
73    }
74}
75
76/// A type that broadcasts [`TransactionEvent`] to installed listeners.
77///
78/// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to
79/// all active receivers.
80#[derive(Debug)]
81pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
82    /// All listeners for all transaction events.
83    all_events_broadcaster: AllPoolEventsBroadcaster<T>,
84    /// All listeners for events for a certain transaction hash.
85    broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
86}
87
88impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
89    fn default() -> Self {
90        Self {
91            all_events_broadcaster: AllPoolEventsBroadcaster::default(),
92            broadcasters_by_hash: HashMap::default(),
93        }
94    }
95}
96
97impl<T: PoolTransaction> PoolEventBroadcast<T> {
98    /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
99    fn broadcast_event(
100        &mut self,
101        hash: &TxHash,
102        event: TransactionEvent,
103        pool_event: FullTransactionEvent<T>,
104    ) {
105        // Broadcast to all listeners for the transaction hash.
106        if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
107            sink.get_mut().broadcast(event.clone());
108
109            if sink.get().is_empty() || event.is_final() {
110                sink.remove();
111            }
112        }
113
114        // Broadcast to all listeners for all transactions.
115        self.all_events_broadcaster.broadcast(pool_event);
116    }
117
118    /// Returns true if no listeners are installed
119    #[inline]
120    pub(crate) fn is_empty(&self) -> bool {
121        self.all_events_broadcaster.is_empty() && self.broadcasters_by_hash.is_empty()
122    }
123
124    /// Create a new subscription for the given transaction hash.
125    pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
126        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
127
128        match self.broadcasters_by_hash.entry(tx_hash) {
129            Entry::Occupied(mut entry) => {
130                entry.get_mut().senders.push(tx);
131            }
132            Entry::Vacant(entry) => {
133                entry.insert(PoolEventBroadcaster { senders: vec![tx] });
134            }
135        };
136        TransactionEvents { hash: tx_hash, events: rx }
137    }
138
139    /// Create a new subscription for all transactions.
140    pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
141        let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
142        self.all_events_broadcaster.senders.push(tx);
143        AllTransactionsEvents::new(rx)
144    }
145
146    /// Notify listeners about a transaction that was added to the pending queue.
147    pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
148        self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
149
150        if let Some(replaced) = replaced {
151            // notify listeners that this transaction was replaced
152            self.replaced(replaced, *tx);
153        }
154    }
155
156    /// Notify listeners about a transaction that was replaced.
157    pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
158        let transaction = Arc::clone(&tx);
159        self.broadcast_event(
160            tx.hash(),
161            TransactionEvent::Replaced(replaced_by),
162            FullTransactionEvent::Replaced { transaction, replaced_by },
163        );
164    }
165
166    /// Notify listeners about a transaction that was added to the queued pool.
167    pub(crate) fn queued(&mut self, tx: &TxHash) {
168        self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
169    }
170
171    /// Notify listeners about a transaction that was propagated.
172    pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
173        let peers = Arc::new(peers);
174        self.broadcast_event(
175            tx,
176            TransactionEvent::Propagated(Arc::clone(&peers)),
177            FullTransactionEvent::Propagated(peers),
178        );
179    }
180
181    /// Notify listeners about all discarded transactions.
182    #[inline]
183    pub(crate) fn discarded_many(&mut self, discarded: &[Arc<ValidPoolTransaction<T>>]) {
184        if self.is_empty() {
185            return
186        }
187        for tx in discarded {
188            self.discarded(tx.hash());
189        }
190    }
191
192    /// Notify listeners about a transaction that was discarded.
193    pub(crate) fn discarded(&mut self, tx: &TxHash) {
194        self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
195    }
196
197    /// Notify listeners about a transaction that was invalid.
198    pub(crate) fn invalid(&mut self, tx: &TxHash) {
199        self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
200    }
201
202    /// Notify listeners that the transaction was mined
203    pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
204        self.broadcast_event(
205            tx,
206            TransactionEvent::Mined(block_hash),
207            FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
208        );
209    }
210}
211
212/// All Sender half(s) of the event channels for all transactions.
213///
214/// This mimics [`tokio::sync::broadcast`] but uses separate channels.
215#[derive(Debug)]
216struct AllPoolEventsBroadcaster<T: PoolTransaction> {
217    /// Corresponding sender half(s) for event listener channel
218    senders: Vec<Sender<FullTransactionEvent<T>>>,
219}
220
221impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
222    fn default() -> Self {
223        Self { senders: Vec::new() }
224    }
225}
226
227impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
228    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
229    fn broadcast(&mut self, event: FullTransactionEvent<T>) {
230        self.senders.retain(|sender| match sender.try_send(event.clone()) {
231            Ok(_) | Err(TrySendError::Full(_)) => true,
232            Err(TrySendError::Closed(_)) => false,
233        })
234    }
235
236    /// Returns true if there are no listeners installed.
237    #[inline]
238    const fn is_empty(&self) -> bool {
239        self.senders.is_empty()
240    }
241}
242
243/// All Sender half(s) of the event channels for a specific transaction.
244///
245/// This mimics [`tokio::sync::broadcast`] but uses separate channels and is unbounded.
246#[derive(Default, Debug)]
247struct PoolEventBroadcaster {
248    /// Corresponding sender half(s) for event listener channel
249    senders: Vec<UnboundedSender<TransactionEvent>>,
250}
251
252impl PoolEventBroadcaster {
253    /// Returns `true` if there are no more listeners remaining.
254    const fn is_empty(&self) -> bool {
255        self.senders.is_empty()
256    }
257
258    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
259    fn broadcast(&mut self, event: TransactionEvent) {
260        self.senders.retain(|sender| sender.send(event.clone()).is_ok())
261    }
262}
263
264/// An active listener for new pending transactions.
265#[derive(Debug)]
266pub(crate) struct PendingTransactionHashListener {
267    pub(crate) sender: mpsc::Sender<TxHash>,
268    /// Whether to include transactions that should not be propagated over the network.
269    pub(crate) kind: TransactionListenerKind,
270}
271
272impl PendingTransactionHashListener {
273    /// Attempts to send all hashes to the listener.
274    ///
275    /// Returns false if the channel is closed (receiver dropped)
276    pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
277        for tx_hash in hashes {
278            match self.sender.try_send(tx_hash) {
279                Ok(()) => {}
280                Err(err) => {
281                    return if matches!(err, mpsc::error::TrySendError::Full(_)) {
282                        debug!(
283                            target: "txpool",
284                            "[{:?}] failed to send pending tx; channel full",
285                            tx_hash,
286                        );
287                        true
288                    } else {
289                        false
290                    }
291                }
292            }
293        }
294        true
295    }
296}
297
298/// An active listener for new pending transactions.
299#[derive(Debug)]
300pub(crate) struct TransactionListener<T: PoolTransaction> {
301    pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
302    /// Whether to include transactions that should not be propagated over the network.
303    pub(crate) kind: TransactionListenerKind,
304}
305
306impl<T: PoolTransaction> TransactionListener<T> {
307    /// Attempts to send the event to the listener.
308    ///
309    /// Returns false if the channel is closed (receiver dropped)
310    pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
311        self.send_all(std::iter::once(event))
312    }
313
314    /// Attempts to send all events to the listener.
315    ///
316    /// Returns false if the channel is closed (receiver dropped)
317    pub(crate) fn send_all(
318        &self,
319        events: impl IntoIterator<Item = NewTransactionEvent<T>>,
320    ) -> bool {
321        for event in events {
322            match self.sender.try_send(event) {
323                Ok(()) => {}
324                Err(err) => {
325                    return if let mpsc::error::TrySendError::Full(event) = err {
326                        debug!(
327                            target: "txpool",
328                            "[{:?}] failed to send pending tx; channel full",
329                            event.transaction.hash(),
330                        );
331                        true
332                    } else {
333                        false
334                    }
335                }
336            }
337        }
338        true
339    }
340}
341
342/// An active listener for new blobs
343#[derive(Debug)]
344pub(crate) struct BlobTransactionSidecarListener {
345    pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
346}
347
348/// Determines what kind of new transactions should be emitted by a stream of transactions.
349///
350/// This gives control whether to include transactions that are allowed to be propagated.
351#[derive(Debug, Copy, Clone, PartialEq, Eq)]
352pub enum TransactionListenerKind {
353    /// Any new pending transactions
354    All,
355    /// Only transactions that are allowed to be propagated.
356    ///
357    /// See also [`ValidPoolTransaction`]
358    PropagateOnly,
359}
360
361impl TransactionListenerKind {
362    /// Returns true if we're only interested in transactions that are allowed to be propagated.
363    #[inline]
364    pub const fn is_propagate_only(&self) -> bool {
365        matches!(self, Self::PropagateOnly)
366    }
367}