reth_transaction_pool/pool/
listener.rs

1//! Listeners for the transaction-pool
2
3use crate::{
4    pool::events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
5    traits::{NewBlobSidecar, PropagateKind},
6    PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11    collections::{hash_map::Entry, HashMap},
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17    self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19use tracing::debug;
20/// The size of the event channel used to propagate transaction events.
21const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23/// A Stream that receives [`TransactionEvent`] only for the transaction with the given hash.
24#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27    hash: TxHash,
28    events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32    /// The hash for this transaction
33    pub const fn hash(&self) -> TxHash {
34        self.hash
35    }
36}
37
38impl Stream for TransactionEvents {
39    type Item = TransactionEvent;
40
41    fn poll_next(
42        self: std::pin::Pin<&mut Self>,
43        cx: &mut std::task::Context<'_>,
44    ) -> std::task::Poll<Option<Self::Item>> {
45        self.get_mut().events.poll_recv(cx)
46    }
47}
48
49/// A Stream that receives [`FullTransactionEvent`] for _all_ transaction.
50#[derive(Debug)]
51#[must_use = "streams do nothing unless polled"]
52pub struct AllTransactionsEvents<T: PoolTransaction> {
53    pub(crate) events: Receiver<FullTransactionEvent<T>>,
54}
55
56impl<T: PoolTransaction> AllTransactionsEvents<T> {
57    /// Create a new instance of this stream.
58    pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
59        Self { events }
60    }
61}
62
63impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
64    type Item = FullTransactionEvent<T>;
65
66    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        self.get_mut().events.poll_recv(cx)
68    }
69}
70
71/// A type that broadcasts [`TransactionEvent`] to installed listeners.
72///
73/// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to
74/// all active receivers.
75#[derive(Debug)]
76pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
77    /// All listeners for all transaction events.
78    all_events_broadcaster: AllPoolEventsBroadcaster<T>,
79    /// All listeners for events for a certain transaction hash.
80    broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
81}
82
83impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
84    fn default() -> Self {
85        Self {
86            all_events_broadcaster: AllPoolEventsBroadcaster::default(),
87            broadcasters_by_hash: HashMap::default(),
88        }
89    }
90}
91
92impl<T: PoolTransaction> PoolEventBroadcast<T> {
93    /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
94    fn broadcast_event(
95        &mut self,
96        hash: &TxHash,
97        event: TransactionEvent,
98        pool_event: FullTransactionEvent<T>,
99    ) {
100        // Broadcast to all listeners for the transaction hash.
101        if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
102            sink.get_mut().broadcast(event.clone());
103
104            if sink.get().is_empty() || event.is_final() {
105                sink.remove();
106            }
107        }
108
109        // Broadcast to all listeners for all transactions.
110        self.all_events_broadcaster.broadcast(pool_event);
111    }
112
113    /// Create a new subscription for the given transaction hash.
114    pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
115        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
116
117        match self.broadcasters_by_hash.entry(tx_hash) {
118            Entry::Occupied(mut entry) => {
119                entry.get_mut().senders.push(tx);
120            }
121            Entry::Vacant(entry) => {
122                entry.insert(PoolEventBroadcaster { senders: vec![tx] });
123            }
124        };
125        TransactionEvents { hash: tx_hash, events: rx }
126    }
127
128    /// Create a new subscription for all transactions.
129    pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
130        let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
131        self.all_events_broadcaster.senders.push(tx);
132        AllTransactionsEvents::new(rx)
133    }
134
135    /// Notify listeners about a transaction that was added to the pending queue.
136    pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
137        self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
138
139        if let Some(replaced) = replaced {
140            // notify listeners that this transaction was replaced
141            self.replaced(replaced, *tx);
142        }
143    }
144
145    /// Notify listeners about a transaction that was replaced.
146    pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
147        let transaction = Arc::clone(&tx);
148        self.broadcast_event(
149            tx.hash(),
150            TransactionEvent::Replaced(replaced_by),
151            FullTransactionEvent::Replaced { transaction, replaced_by },
152        );
153    }
154
155    /// Notify listeners about a transaction that was added to the queued pool.
156    pub(crate) fn queued(&mut self, tx: &TxHash) {
157        self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
158    }
159
160    /// Notify listeners about a transaction that was propagated.
161    pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
162        let peers = Arc::new(peers);
163        self.broadcast_event(
164            tx,
165            TransactionEvent::Propagated(Arc::clone(&peers)),
166            FullTransactionEvent::Propagated(peers),
167        );
168    }
169
170    /// Notify listeners about a transaction that was discarded.
171    pub(crate) fn discarded(&mut self, tx: &TxHash) {
172        self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
173    }
174
175    /// Notify listeners about a transaction that was invalid.
176    pub(crate) fn invalid(&mut self, tx: &TxHash) {
177        self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
178    }
179
180    /// Notify listeners that the transaction was mined
181    pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
182        self.broadcast_event(
183            tx,
184            TransactionEvent::Mined(block_hash),
185            FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
186        );
187    }
188}
189
190/// All Sender half(s) of the event channels for all transactions.
191///
192/// This mimics [`tokio::sync::broadcast`] but uses separate channels.
193#[derive(Debug)]
194struct AllPoolEventsBroadcaster<T: PoolTransaction> {
195    /// Corresponding sender half(s) for event listener channel
196    senders: Vec<Sender<FullTransactionEvent<T>>>,
197}
198
199impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
200    fn default() -> Self {
201        Self { senders: Vec::new() }
202    }
203}
204
205impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
206    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
207    fn broadcast(&mut self, event: FullTransactionEvent<T>) {
208        self.senders.retain(|sender| match sender.try_send(event.clone()) {
209            Ok(_) | Err(TrySendError::Full(_)) => true,
210            Err(TrySendError::Closed(_)) => false,
211        })
212    }
213}
214
215/// All Sender half(s) of the event channels for a specific transaction.
216///
217/// This mimics [`tokio::sync::broadcast`] but uses separate channels and is unbounded.
218#[derive(Default, Debug)]
219struct PoolEventBroadcaster {
220    /// Corresponding sender half(s) for event listener channel
221    senders: Vec<UnboundedSender<TransactionEvent>>,
222}
223
224impl PoolEventBroadcaster {
225    /// Returns `true` if there are no more listeners remaining.
226    fn is_empty(&self) -> bool {
227        self.senders.is_empty()
228    }
229
230    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
231    fn broadcast(&mut self, event: TransactionEvent) {
232        self.senders.retain(|sender| sender.send(event.clone()).is_ok())
233    }
234}
235
236/// An active listener for new pending transactions.
237#[derive(Debug)]
238pub(crate) struct PendingTransactionHashListener {
239    pub(crate) sender: mpsc::Sender<TxHash>,
240    /// Whether to include transactions that should not be propagated over the network.
241    pub(crate) kind: TransactionListenerKind,
242}
243
244impl PendingTransactionHashListener {
245    /// Attempts to send all hashes to the listener.
246    ///
247    /// Returns false if the channel is closed (receiver dropped)
248    pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
249        for tx_hash in hashes {
250            match self.sender.try_send(tx_hash) {
251                Ok(()) => {}
252                Err(err) => {
253                    return if matches!(err, mpsc::error::TrySendError::Full(_)) {
254                        debug!(
255                            target: "txpool",
256                            "[{:?}] failed to send pending tx; channel full",
257                            tx_hash,
258                        );
259                        true
260                    } else {
261                        false
262                    }
263                }
264            }
265        }
266        true
267    }
268}
269
270/// An active listener for new pending transactions.
271#[derive(Debug)]
272pub(crate) struct TransactionListener<T: PoolTransaction> {
273    pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
274    /// Whether to include transactions that should not be propagated over the network.
275    pub(crate) kind: TransactionListenerKind,
276}
277
278impl<T: PoolTransaction> TransactionListener<T> {
279    /// Attempts to send the event to the listener.
280    ///
281    /// Returns false if the channel is closed (receiver dropped)
282    pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
283        self.send_all(std::iter::once(event))
284    }
285
286    /// Attempts to send all events to the listener.
287    ///
288    /// Returns false if the channel is closed (receiver dropped)
289    pub(crate) fn send_all(
290        &self,
291        events: impl IntoIterator<Item = NewTransactionEvent<T>>,
292    ) -> bool {
293        for event in events {
294            match self.sender.try_send(event) {
295                Ok(()) => {}
296                Err(err) => {
297                    return if let mpsc::error::TrySendError::Full(event) = err {
298                        debug!(
299                            target: "txpool",
300                            "[{:?}] failed to send pending tx; channel full",
301                            event.transaction.hash(),
302                        );
303                        true
304                    } else {
305                        false
306                    }
307                }
308            }
309        }
310        true
311    }
312}
313
314/// An active listener for new blobs
315#[derive(Debug)]
316pub(crate) struct BlobTransactionSidecarListener {
317    pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
318}
319
320/// Determines what kind of new transactions should be emitted by a stream of transactions.
321///
322/// This gives control whether to include transactions that are allowed to be propagated.
323#[derive(Debug, Copy, Clone, PartialEq, Eq)]
324pub enum TransactionListenerKind {
325    /// Any new pending transactions
326    All,
327    /// Only transactions that are allowed to be propagated.
328    ///
329    /// See also [`ValidPoolTransaction`]
330    PropagateOnly,
331}
332
333impl TransactionListenerKind {
334    /// Returns true if we're only interested in transactions that are allowed to be propagated.
335    #[inline]
336    pub const fn is_propagate_only(&self) -> bool {
337        matches!(self, Self::PropagateOnly)
338    }
339}