reth_transaction_pool/pool/
listener.rs

1//! Listeners for the transaction-pool
2
3use crate::{
4    pool::{
5        events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
6        QueuedReason,
7    },
8    traits::{NewBlobSidecar, PropagateKind},
9    PoolTransaction, ValidPoolTransaction,
10};
11use alloy_primitives::{TxHash, B256};
12use futures_util::Stream;
13use std::{
14    collections::{hash_map::Entry, HashMap},
15    pin::Pin,
16    sync::Arc,
17    task::{Context, Poll},
18};
19use tokio::sync::mpsc::{
20    self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
21};
22use tracing::debug;
23
24/// The size of the event channel used to propagate transaction events.
25const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
26
27/// A Stream that receives [`TransactionEvent`] only for the transaction with the given hash.
28#[derive(Debug)]
29#[must_use = "streams do nothing unless polled"]
30pub struct TransactionEvents {
31    hash: TxHash,
32    events: UnboundedReceiver<TransactionEvent>,
33}
34
35impl TransactionEvents {
36    /// Create a new instance of this stream.
37    pub const fn new(hash: TxHash, events: UnboundedReceiver<TransactionEvent>) -> Self {
38        Self { hash, events }
39    }
40
41    /// The hash for this transaction
42    pub const fn hash(&self) -> TxHash {
43        self.hash
44    }
45}
46
47impl Stream for TransactionEvents {
48    type Item = TransactionEvent;
49
50    fn poll_next(
51        self: std::pin::Pin<&mut Self>,
52        cx: &mut std::task::Context<'_>,
53    ) -> std::task::Poll<Option<Self::Item>> {
54        self.get_mut().events.poll_recv(cx)
55    }
56}
57
58/// A Stream that receives [`FullTransactionEvent`] for _all_ transaction.
59#[derive(Debug)]
60#[must_use = "streams do nothing unless polled"]
61pub struct AllTransactionsEvents<T: PoolTransaction> {
62    pub(crate) events: Receiver<FullTransactionEvent<T>>,
63}
64
65impl<T: PoolTransaction> AllTransactionsEvents<T> {
66    /// Create a new instance of this stream.
67    pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
68        Self { events }
69    }
70}
71
72impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
73    type Item = FullTransactionEvent<T>;
74
75    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76        self.get_mut().events.poll_recv(cx)
77    }
78}
79
80/// A type that broadcasts [`TransactionEvent`] to installed listeners.
81///
82/// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to
83/// all active receivers.
84#[derive(Debug)]
85pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
86    /// All listeners for all transaction events.
87    all_events_broadcaster: AllPoolEventsBroadcaster<T>,
88    /// All listeners for events for a certain transaction hash.
89    broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
90}
91
92impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
93    fn default() -> Self {
94        Self {
95            all_events_broadcaster: AllPoolEventsBroadcaster::default(),
96            broadcasters_by_hash: HashMap::default(),
97        }
98    }
99}
100
101impl<T: PoolTransaction> PoolEventBroadcast<T> {
102    /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
103    fn broadcast_event(
104        &mut self,
105        hash: &TxHash,
106        event: TransactionEvent,
107        pool_event: FullTransactionEvent<T>,
108    ) {
109        // Broadcast to all listeners for the transaction hash.
110        if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
111            sink.get_mut().broadcast(event.clone());
112
113            if sink.get().is_empty() || event.is_final() {
114                sink.remove();
115            }
116        }
117
118        // Broadcast to all listeners for all transactions.
119        self.all_events_broadcaster.broadcast(pool_event);
120    }
121
122    /// Returns true if no listeners are installed
123    #[inline]
124    pub(crate) fn is_empty(&self) -> bool {
125        self.all_events_broadcaster.is_empty() && self.broadcasters_by_hash.is_empty()
126    }
127
128    /// Create a new subscription for the given transaction hash.
129    pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
130        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
131
132        match self.broadcasters_by_hash.entry(tx_hash) {
133            Entry::Occupied(mut entry) => {
134                entry.get_mut().senders.push(tx);
135            }
136            Entry::Vacant(entry) => {
137                entry.insert(PoolEventBroadcaster { senders: vec![tx] });
138            }
139        };
140        TransactionEvents { hash: tx_hash, events: rx }
141    }
142
143    /// Create a new subscription for all transactions.
144    pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
145        let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
146        self.all_events_broadcaster.senders.push(tx);
147        AllTransactionsEvents::new(rx)
148    }
149
150    /// Notify listeners about a transaction that was added to the pending queue.
151    pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
152        self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
153
154        if let Some(replaced) = replaced {
155            // notify listeners that this transaction was replaced
156            self.replaced(replaced, *tx);
157        }
158    }
159
160    /// Notify listeners about a transaction that was replaced.
161    pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
162        let transaction = Arc::clone(&tx);
163        self.broadcast_event(
164            tx.hash(),
165            TransactionEvent::Replaced(replaced_by),
166            FullTransactionEvent::Replaced { transaction, replaced_by },
167        );
168    }
169
170    /// Notify listeners about a transaction that was added to the queued pool.
171    pub(crate) fn queued(&mut self, tx: &TxHash, reason: Option<QueuedReason>) {
172        self.broadcast_event(
173            tx,
174            TransactionEvent::Queued,
175            FullTransactionEvent::Queued(*tx, reason),
176        );
177    }
178
179    /// Notify listeners about a transaction that was propagated.
180    pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
181        let peers = Arc::new(peers);
182        self.broadcast_event(
183            tx,
184            TransactionEvent::Propagated(Arc::clone(&peers)),
185            FullTransactionEvent::Propagated(peers),
186        );
187    }
188
189    /// Notify listeners about all discarded transactions.
190    #[inline]
191    pub(crate) fn discarded_many(&mut self, discarded: &[Arc<ValidPoolTransaction<T>>]) {
192        if self.is_empty() {
193            return
194        }
195        for tx in discarded {
196            self.discarded(tx.hash());
197        }
198    }
199
200    /// Notify listeners about a transaction that was discarded.
201    pub(crate) fn discarded(&mut self, tx: &TxHash) {
202        self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
203    }
204
205    /// Notify listeners about a transaction that was invalid.
206    pub(crate) fn invalid(&mut self, tx: &TxHash) {
207        self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
208    }
209
210    /// Notify listeners that the transaction was mined
211    pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
212        self.broadcast_event(
213            tx,
214            TransactionEvent::Mined(block_hash),
215            FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
216        );
217    }
218}
219
220/// All Sender half(s) of the event channels for all transactions.
221///
222/// This mimics [`tokio::sync::broadcast`] but uses separate channels.
223#[derive(Debug)]
224struct AllPoolEventsBroadcaster<T: PoolTransaction> {
225    /// Corresponding sender half(s) for event listener channel
226    senders: Vec<Sender<FullTransactionEvent<T>>>,
227}
228
229impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
230    fn default() -> Self {
231        Self { senders: Vec::new() }
232    }
233}
234
235impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
236    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
237    fn broadcast(&mut self, event: FullTransactionEvent<T>) {
238        self.senders.retain(|sender| match sender.try_send(event.clone()) {
239            Ok(_) | Err(TrySendError::Full(_)) => true,
240            Err(TrySendError::Closed(_)) => false,
241        })
242    }
243
244    /// Returns true if there are no listeners installed.
245    #[inline]
246    const fn is_empty(&self) -> bool {
247        self.senders.is_empty()
248    }
249}
250
251/// All Sender half(s) of the event channels for a specific transaction.
252///
253/// This mimics [`tokio::sync::broadcast`] but uses separate channels and is unbounded.
254#[derive(Default, Debug)]
255struct PoolEventBroadcaster {
256    /// Corresponding sender half(s) for event listener channel
257    senders: Vec<UnboundedSender<TransactionEvent>>,
258}
259
260impl PoolEventBroadcaster {
261    /// Returns `true` if there are no more listeners remaining.
262    const fn is_empty(&self) -> bool {
263        self.senders.is_empty()
264    }
265
266    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
267    fn broadcast(&mut self, event: TransactionEvent) {
268        self.senders.retain(|sender| sender.send(event.clone()).is_ok())
269    }
270}
271
272/// An active listener for new pending transactions.
273#[derive(Debug)]
274pub(crate) struct PendingTransactionHashListener {
275    pub(crate) sender: mpsc::Sender<TxHash>,
276    /// Whether to include transactions that should not be propagated over the network.
277    pub(crate) kind: TransactionListenerKind,
278}
279
280impl PendingTransactionHashListener {
281    /// Attempts to send all hashes to the listener.
282    ///
283    /// Returns false if the channel is closed (receiver dropped)
284    pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
285        for tx_hash in hashes {
286            match self.sender.try_send(tx_hash) {
287                Ok(()) => {}
288                Err(err) => {
289                    return if matches!(err, mpsc::error::TrySendError::Full(_)) {
290                        debug!(
291                            target: "txpool",
292                            "[{:?}] failed to send pending tx; channel full",
293                            tx_hash,
294                        );
295                        true
296                    } else {
297                        false
298                    }
299                }
300            }
301        }
302        true
303    }
304}
305
306/// An active listener for new pending transactions.
307#[derive(Debug)]
308pub(crate) struct TransactionListener<T: PoolTransaction> {
309    pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
310    /// Whether to include transactions that should not be propagated over the network.
311    pub(crate) kind: TransactionListenerKind,
312}
313
314impl<T: PoolTransaction> TransactionListener<T> {
315    /// Attempts to send the event to the listener.
316    ///
317    /// Returns false if the channel is closed (receiver dropped)
318    pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
319        self.send_all(std::iter::once(event))
320    }
321
322    /// Attempts to send all events to the listener.
323    ///
324    /// Returns false if the channel is closed (receiver dropped)
325    pub(crate) fn send_all(
326        &self,
327        events: impl IntoIterator<Item = NewTransactionEvent<T>>,
328    ) -> bool {
329        for event in events {
330            match self.sender.try_send(event) {
331                Ok(()) => {}
332                Err(err) => {
333                    return if let mpsc::error::TrySendError::Full(event) = err {
334                        debug!(
335                            target: "txpool",
336                            "[{:?}] failed to send pending tx; channel full",
337                            event.transaction.hash(),
338                        );
339                        true
340                    } else {
341                        false
342                    }
343                }
344            }
345        }
346        true
347    }
348}
349
350/// An active listener for new blobs
351#[derive(Debug)]
352pub(crate) struct BlobTransactionSidecarListener {
353    pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
354}
355
356/// Determines what kind of new transactions should be emitted by a stream of transactions.
357///
358/// This gives control whether to include transactions that are allowed to be propagated.
359#[derive(Debug, Copy, Clone, PartialEq, Eq)]
360pub enum TransactionListenerKind {
361    /// Any new pending transactions
362    All,
363    /// Only transactions that are allowed to be propagated.
364    ///
365    /// See also [`ValidPoolTransaction`]
366    PropagateOnly,
367}
368
369impl TransactionListenerKind {
370    /// Returns true if we're only interested in transactions that are allowed to be propagated.
371    #[inline]
372    pub const fn is_propagate_only(&self) -> bool {
373        matches!(self, Self::PropagateOnly)
374    }
375}