reth_transaction_pool/pool/
listener.rs1use crate::{
4 pool::{
5 events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
6 QueuedReason,
7 },
8 traits::{NewBlobSidecar, PropagateKind},
9 PoolTransaction, ValidPoolTransaction,
10};
11use alloy_primitives::{TxHash, B256};
12use futures_util::Stream;
13use std::{
14 collections::{hash_map::Entry, HashMap},
15 pin::Pin,
16 sync::Arc,
17 task::{Context, Poll},
18};
19use tokio::sync::mpsc::{
20 self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
21};
22use tracing::debug;
23
24const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
26
27#[derive(Debug)]
29#[must_use = "streams do nothing unless polled"]
30pub struct TransactionEvents {
31 hash: TxHash,
32 events: UnboundedReceiver<TransactionEvent>,
33}
34
35impl TransactionEvents {
36 pub const fn new(hash: TxHash, events: UnboundedReceiver<TransactionEvent>) -> Self {
38 Self { hash, events }
39 }
40
41 pub const fn hash(&self) -> TxHash {
43 self.hash
44 }
45}
46
47impl Stream for TransactionEvents {
48 type Item = TransactionEvent;
49
50 fn poll_next(
51 self: std::pin::Pin<&mut Self>,
52 cx: &mut std::task::Context<'_>,
53 ) -> std::task::Poll<Option<Self::Item>> {
54 self.get_mut().events.poll_recv(cx)
55 }
56}
57
58#[derive(Debug)]
60#[must_use = "streams do nothing unless polled"]
61pub struct AllTransactionsEvents<T: PoolTransaction> {
62 pub(crate) events: Receiver<FullTransactionEvent<T>>,
63}
64
65impl<T: PoolTransaction> AllTransactionsEvents<T> {
66 pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
68 Self { events }
69 }
70}
71
72impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
73 type Item = FullTransactionEvent<T>;
74
75 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76 self.get_mut().events.poll_recv(cx)
77 }
78}
79
80#[derive(Debug)]
85pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
86 all_events_broadcaster: AllPoolEventsBroadcaster<T>,
88 broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
90}
91
92impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
93 fn default() -> Self {
94 Self {
95 all_events_broadcaster: AllPoolEventsBroadcaster::default(),
96 broadcasters_by_hash: HashMap::default(),
97 }
98 }
99}
100
101impl<T: PoolTransaction> PoolEventBroadcast<T> {
102 fn broadcast_event(
104 &mut self,
105 hash: &TxHash,
106 event: TransactionEvent,
107 pool_event: FullTransactionEvent<T>,
108 ) {
109 if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
111 sink.get_mut().broadcast(event.clone());
112
113 if sink.get().is_empty() || event.is_final() {
114 sink.remove();
115 }
116 }
117
118 self.all_events_broadcaster.broadcast(pool_event);
120 }
121
122 #[inline]
124 pub(crate) fn is_empty(&self) -> bool {
125 self.all_events_broadcaster.is_empty() && self.broadcasters_by_hash.is_empty()
126 }
127
128 pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
130 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
131
132 match self.broadcasters_by_hash.entry(tx_hash) {
133 Entry::Occupied(mut entry) => {
134 entry.get_mut().senders.push(tx);
135 }
136 Entry::Vacant(entry) => {
137 entry.insert(PoolEventBroadcaster { senders: vec![tx] });
138 }
139 };
140 TransactionEvents { hash: tx_hash, events: rx }
141 }
142
143 pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
145 let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
146 self.all_events_broadcaster.senders.push(tx);
147 AllTransactionsEvents::new(rx)
148 }
149
150 pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
152 self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
153
154 if let Some(replaced) = replaced {
155 self.replaced(replaced, *tx);
157 }
158 }
159
160 pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
162 let transaction = Arc::clone(&tx);
163 self.broadcast_event(
164 tx.hash(),
165 TransactionEvent::Replaced(replaced_by),
166 FullTransactionEvent::Replaced { transaction, replaced_by },
167 );
168 }
169
170 pub(crate) fn queued(&mut self, tx: &TxHash, reason: Option<QueuedReason>) {
172 self.broadcast_event(
173 tx,
174 TransactionEvent::Queued,
175 FullTransactionEvent::Queued(*tx, reason),
176 );
177 }
178
179 pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
181 let peers = Arc::new(peers);
182 self.broadcast_event(
183 tx,
184 TransactionEvent::Propagated(Arc::clone(&peers)),
185 FullTransactionEvent::Propagated(peers),
186 );
187 }
188
189 #[inline]
191 pub(crate) fn discarded_many(&mut self, discarded: &[Arc<ValidPoolTransaction<T>>]) {
192 if self.is_empty() {
193 return
194 }
195 for tx in discarded {
196 self.discarded(tx.hash());
197 }
198 }
199
200 pub(crate) fn discarded(&mut self, tx: &TxHash) {
202 self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
203 }
204
205 pub(crate) fn invalid(&mut self, tx: &TxHash) {
207 self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
208 }
209
210 pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
212 self.broadcast_event(
213 tx,
214 TransactionEvent::Mined(block_hash),
215 FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
216 );
217 }
218}
219
220#[derive(Debug)]
224struct AllPoolEventsBroadcaster<T: PoolTransaction> {
225 senders: Vec<Sender<FullTransactionEvent<T>>>,
227}
228
229impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
230 fn default() -> Self {
231 Self { senders: Vec::new() }
232 }
233}
234
235impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
236 fn broadcast(&mut self, event: FullTransactionEvent<T>) {
238 self.senders.retain(|sender| match sender.try_send(event.clone()) {
239 Ok(_) | Err(TrySendError::Full(_)) => true,
240 Err(TrySendError::Closed(_)) => false,
241 })
242 }
243
244 #[inline]
246 const fn is_empty(&self) -> bool {
247 self.senders.is_empty()
248 }
249}
250
251#[derive(Default, Debug)]
255struct PoolEventBroadcaster {
256 senders: Vec<UnboundedSender<TransactionEvent>>,
258}
259
260impl PoolEventBroadcaster {
261 const fn is_empty(&self) -> bool {
263 self.senders.is_empty()
264 }
265
266 fn broadcast(&mut self, event: TransactionEvent) {
268 self.senders.retain(|sender| sender.send(event.clone()).is_ok())
269 }
270}
271
272#[derive(Debug)]
274pub(crate) struct PendingTransactionHashListener {
275 pub(crate) sender: mpsc::Sender<TxHash>,
276 pub(crate) kind: TransactionListenerKind,
278}
279
280impl PendingTransactionHashListener {
281 pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
285 for tx_hash in hashes {
286 match self.sender.try_send(tx_hash) {
287 Ok(()) => {}
288 Err(err) => {
289 return if matches!(err, mpsc::error::TrySendError::Full(_)) {
290 debug!(
291 target: "txpool",
292 "[{:?}] failed to send pending tx; channel full",
293 tx_hash,
294 );
295 true
296 } else {
297 false
298 }
299 }
300 }
301 }
302 true
303 }
304}
305
306#[derive(Debug)]
308pub(crate) struct TransactionListener<T: PoolTransaction> {
309 pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
310 pub(crate) kind: TransactionListenerKind,
312}
313
314impl<T: PoolTransaction> TransactionListener<T> {
315 pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
319 self.send_all(std::iter::once(event))
320 }
321
322 pub(crate) fn send_all(
326 &self,
327 events: impl IntoIterator<Item = NewTransactionEvent<T>>,
328 ) -> bool {
329 for event in events {
330 match self.sender.try_send(event) {
331 Ok(()) => {}
332 Err(err) => {
333 return if let mpsc::error::TrySendError::Full(event) = err {
334 debug!(
335 target: "txpool",
336 "[{:?}] failed to send pending tx; channel full",
337 event.transaction.hash(),
338 );
339 true
340 } else {
341 false
342 }
343 }
344 }
345 }
346 true
347 }
348}
349
350#[derive(Debug)]
352pub(crate) struct BlobTransactionSidecarListener {
353 pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
354}
355
356#[derive(Debug, Copy, Clone, PartialEq, Eq)]
360pub enum TransactionListenerKind {
361 All,
363 PropagateOnly,
367}
368
369impl TransactionListenerKind {
370 #[inline]
372 pub const fn is_propagate_only(&self) -> bool {
373 matches!(self, Self::PropagateOnly)
374 }
375}