reth_transaction_pool/pool/
listener.rs

1//! Listeners for the transaction-pool
2
3use crate::{
4    pool::events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
5    traits::{NewBlobSidecar, PropagateKind},
6    PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11    collections::{hash_map::Entry, HashMap},
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17    self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19use tracing::debug;
20/// The size of the event channel used to propagate transaction events.
21const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23/// A Stream that receives [`TransactionEvent`] only for the transaction with the given hash.
24#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27    hash: TxHash,
28    events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32    /// The hash for this transaction
33    pub const fn hash(&self) -> TxHash {
34        self.hash
35    }
36}
37
38impl Stream for TransactionEvents {
39    type Item = TransactionEvent;
40
41    fn poll_next(
42        self: std::pin::Pin<&mut Self>,
43        cx: &mut std::task::Context<'_>,
44    ) -> std::task::Poll<Option<Self::Item>> {
45        self.get_mut().events.poll_recv(cx)
46    }
47}
48
49/// A Stream that receives [`FullTransactionEvent`] for _all_ transaction.
50#[derive(Debug)]
51#[must_use = "streams do nothing unless polled"]
52pub struct AllTransactionsEvents<T: PoolTransaction> {
53    pub(crate) events: Receiver<FullTransactionEvent<T>>,
54}
55
56impl<T: PoolTransaction> AllTransactionsEvents<T> {
57    /// Create a new instance of this stream.
58    pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
59        Self { events }
60    }
61}
62
63impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
64    type Item = FullTransactionEvent<T>;
65
66    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        self.get_mut().events.poll_recv(cx)
68    }
69}
70
71/// A type that broadcasts [`TransactionEvent`] to installed listeners.
72///
73/// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to
74/// all active receivers.
75#[derive(Debug)]
76pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
77    /// All listeners for all transaction events.
78    all_events_broadcaster: AllPoolEventsBroadcaster<T>,
79    /// All listeners for events for a certain transaction hash.
80    broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
81}
82
83impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
84    fn default() -> Self {
85        Self {
86            all_events_broadcaster: AllPoolEventsBroadcaster::default(),
87            broadcasters_by_hash: HashMap::default(),
88        }
89    }
90}
91
92impl<T: PoolTransaction> PoolEventBroadcast<T> {
93    /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
94    fn broadcast_event(
95        &mut self,
96        hash: &TxHash,
97        event: TransactionEvent,
98        pool_event: FullTransactionEvent<T>,
99    ) {
100        // Broadcast to all listeners for the transaction hash.
101        if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
102            sink.get_mut().broadcast(event.clone());
103
104            if sink.get().is_empty() || event.is_final() {
105                sink.remove();
106            }
107        }
108
109        // Broadcast to all listeners for all transactions.
110        self.all_events_broadcaster.broadcast(pool_event);
111    }
112
113    /// Returns true if no listeners are installed
114    #[inline]
115    pub(crate) fn is_empty(&self) -> bool {
116        self.all_events_broadcaster.is_empty() && self.broadcasters_by_hash.is_empty()
117    }
118
119    /// Create a new subscription for the given transaction hash.
120    pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
121        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
122
123        match self.broadcasters_by_hash.entry(tx_hash) {
124            Entry::Occupied(mut entry) => {
125                entry.get_mut().senders.push(tx);
126            }
127            Entry::Vacant(entry) => {
128                entry.insert(PoolEventBroadcaster { senders: vec![tx] });
129            }
130        };
131        TransactionEvents { hash: tx_hash, events: rx }
132    }
133
134    /// Create a new subscription for all transactions.
135    pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
136        let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
137        self.all_events_broadcaster.senders.push(tx);
138        AllTransactionsEvents::new(rx)
139    }
140
141    /// Notify listeners about a transaction that was added to the pending queue.
142    pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
143        self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
144
145        if let Some(replaced) = replaced {
146            // notify listeners that this transaction was replaced
147            self.replaced(replaced, *tx);
148        }
149    }
150
151    /// Notify listeners about a transaction that was replaced.
152    pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
153        let transaction = Arc::clone(&tx);
154        self.broadcast_event(
155            tx.hash(),
156            TransactionEvent::Replaced(replaced_by),
157            FullTransactionEvent::Replaced { transaction, replaced_by },
158        );
159    }
160
161    /// Notify listeners about a transaction that was added to the queued pool.
162    pub(crate) fn queued(&mut self, tx: &TxHash) {
163        self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
164    }
165
166    /// Notify listeners about a transaction that was propagated.
167    pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
168        let peers = Arc::new(peers);
169        self.broadcast_event(
170            tx,
171            TransactionEvent::Propagated(Arc::clone(&peers)),
172            FullTransactionEvent::Propagated(peers),
173        );
174    }
175
176    /// Notify listeners about all discarded transactions.
177    #[inline]
178    pub(crate) fn discarded_many(&mut self, discarded: &[Arc<ValidPoolTransaction<T>>]) {
179        if self.is_empty() {
180            return
181        }
182        for tx in discarded {
183            self.discarded(tx.hash());
184        }
185    }
186
187    /// Notify listeners about a transaction that was discarded.
188    pub(crate) fn discarded(&mut self, tx: &TxHash) {
189        self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
190    }
191
192    /// Notify listeners about a transaction that was invalid.
193    pub(crate) fn invalid(&mut self, tx: &TxHash) {
194        self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
195    }
196
197    /// Notify listeners that the transaction was mined
198    pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
199        self.broadcast_event(
200            tx,
201            TransactionEvent::Mined(block_hash),
202            FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
203        );
204    }
205}
206
207/// All Sender half(s) of the event channels for all transactions.
208///
209/// This mimics [`tokio::sync::broadcast`] but uses separate channels.
210#[derive(Debug)]
211struct AllPoolEventsBroadcaster<T: PoolTransaction> {
212    /// Corresponding sender half(s) for event listener channel
213    senders: Vec<Sender<FullTransactionEvent<T>>>,
214}
215
216impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
217    fn default() -> Self {
218        Self { senders: Vec::new() }
219    }
220}
221
222impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
223    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
224    fn broadcast(&mut self, event: FullTransactionEvent<T>) {
225        self.senders.retain(|sender| match sender.try_send(event.clone()) {
226            Ok(_) | Err(TrySendError::Full(_)) => true,
227            Err(TrySendError::Closed(_)) => false,
228        })
229    }
230
231    /// Returns true if there are no listeners installed.
232    #[inline]
233    const fn is_empty(&self) -> bool {
234        self.senders.is_empty()
235    }
236}
237
238/// All Sender half(s) of the event channels for a specific transaction.
239///
240/// This mimics [`tokio::sync::broadcast`] but uses separate channels and is unbounded.
241#[derive(Default, Debug)]
242struct PoolEventBroadcaster {
243    /// Corresponding sender half(s) for event listener channel
244    senders: Vec<UnboundedSender<TransactionEvent>>,
245}
246
247impl PoolEventBroadcaster {
248    /// Returns `true` if there are no more listeners remaining.
249    const fn is_empty(&self) -> bool {
250        self.senders.is_empty()
251    }
252
253    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
254    fn broadcast(&mut self, event: TransactionEvent) {
255        self.senders.retain(|sender| sender.send(event.clone()).is_ok())
256    }
257}
258
259/// An active listener for new pending transactions.
260#[derive(Debug)]
261pub(crate) struct PendingTransactionHashListener {
262    pub(crate) sender: mpsc::Sender<TxHash>,
263    /// Whether to include transactions that should not be propagated over the network.
264    pub(crate) kind: TransactionListenerKind,
265}
266
267impl PendingTransactionHashListener {
268    /// Attempts to send all hashes to the listener.
269    ///
270    /// Returns false if the channel is closed (receiver dropped)
271    pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
272        for tx_hash in hashes {
273            match self.sender.try_send(tx_hash) {
274                Ok(()) => {}
275                Err(err) => {
276                    return if matches!(err, mpsc::error::TrySendError::Full(_)) {
277                        debug!(
278                            target: "txpool",
279                            "[{:?}] failed to send pending tx; channel full",
280                            tx_hash,
281                        );
282                        true
283                    } else {
284                        false
285                    }
286                }
287            }
288        }
289        true
290    }
291}
292
293/// An active listener for new pending transactions.
294#[derive(Debug)]
295pub(crate) struct TransactionListener<T: PoolTransaction> {
296    pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
297    /// Whether to include transactions that should not be propagated over the network.
298    pub(crate) kind: TransactionListenerKind,
299}
300
301impl<T: PoolTransaction> TransactionListener<T> {
302    /// Attempts to send the event to the listener.
303    ///
304    /// Returns false if the channel is closed (receiver dropped)
305    pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
306        self.send_all(std::iter::once(event))
307    }
308
309    /// Attempts to send all events to the listener.
310    ///
311    /// Returns false if the channel is closed (receiver dropped)
312    pub(crate) fn send_all(
313        &self,
314        events: impl IntoIterator<Item = NewTransactionEvent<T>>,
315    ) -> bool {
316        for event in events {
317            match self.sender.try_send(event) {
318                Ok(()) => {}
319                Err(err) => {
320                    return if let mpsc::error::TrySendError::Full(event) = err {
321                        debug!(
322                            target: "txpool",
323                            "[{:?}] failed to send pending tx; channel full",
324                            event.transaction.hash(),
325                        );
326                        true
327                    } else {
328                        false
329                    }
330                }
331            }
332        }
333        true
334    }
335}
336
337/// An active listener for new blobs
338#[derive(Debug)]
339pub(crate) struct BlobTransactionSidecarListener {
340    pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
341}
342
343/// Determines what kind of new transactions should be emitted by a stream of transactions.
344///
345/// This gives control whether to include transactions that are allowed to be propagated.
346#[derive(Debug, Copy, Clone, PartialEq, Eq)]
347pub enum TransactionListenerKind {
348    /// Any new pending transactions
349    All,
350    /// Only transactions that are allowed to be propagated.
351    ///
352    /// See also [`ValidPoolTransaction`]
353    PropagateOnly,
354}
355
356impl TransactionListenerKind {
357    /// Returns true if we're only interested in transactions that are allowed to be propagated.
358    #[inline]
359    pub const fn is_propagate_only(&self) -> bool {
360        matches!(self, Self::PropagateOnly)
361    }
362}