reth_transaction_pool/pool/
listener.rs1use crate::{
4 pool::events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
5 traits::{NewBlobSidecar, PropagateKind},
6 PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11 collections::{hash_map::Entry, HashMap},
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17 self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19use tracing::debug;
20const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27 hash: TxHash,
28 events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32 pub const fn hash(&self) -> TxHash {
34 self.hash
35 }
36}
37
38impl Stream for TransactionEvents {
39 type Item = TransactionEvent;
40
41 fn poll_next(
42 self: std::pin::Pin<&mut Self>,
43 cx: &mut std::task::Context<'_>,
44 ) -> std::task::Poll<Option<Self::Item>> {
45 self.get_mut().events.poll_recv(cx)
46 }
47}
48
49#[derive(Debug)]
51#[must_use = "streams do nothing unless polled"]
52pub struct AllTransactionsEvents<T: PoolTransaction> {
53 pub(crate) events: Receiver<FullTransactionEvent<T>>,
54}
55
56impl<T: PoolTransaction> AllTransactionsEvents<T> {
57 pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
59 Self { events }
60 }
61}
62
63impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
64 type Item = FullTransactionEvent<T>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 self.get_mut().events.poll_recv(cx)
68 }
69}
70
71#[derive(Debug)]
76pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
77 all_events_broadcaster: AllPoolEventsBroadcaster<T>,
79 broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
81}
82
83impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
84 fn default() -> Self {
85 Self {
86 all_events_broadcaster: AllPoolEventsBroadcaster::default(),
87 broadcasters_by_hash: HashMap::default(),
88 }
89 }
90}
91
92impl<T: PoolTransaction> PoolEventBroadcast<T> {
93 fn broadcast_event(
95 &mut self,
96 hash: &TxHash,
97 event: TransactionEvent,
98 pool_event: FullTransactionEvent<T>,
99 ) {
100 if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
102 sink.get_mut().broadcast(event.clone());
103
104 if sink.get().is_empty() || event.is_final() {
105 sink.remove();
106 }
107 }
108
109 self.all_events_broadcaster.broadcast(pool_event);
111 }
112
113 #[inline]
115 pub(crate) fn is_empty(&self) -> bool {
116 self.all_events_broadcaster.is_empty() && self.broadcasters_by_hash.is_empty()
117 }
118
119 pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
121 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
122
123 match self.broadcasters_by_hash.entry(tx_hash) {
124 Entry::Occupied(mut entry) => {
125 entry.get_mut().senders.push(tx);
126 }
127 Entry::Vacant(entry) => {
128 entry.insert(PoolEventBroadcaster { senders: vec![tx] });
129 }
130 };
131 TransactionEvents { hash: tx_hash, events: rx }
132 }
133
134 pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
136 let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
137 self.all_events_broadcaster.senders.push(tx);
138 AllTransactionsEvents::new(rx)
139 }
140
141 pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
143 self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
144
145 if let Some(replaced) = replaced {
146 self.replaced(replaced, *tx);
148 }
149 }
150
151 pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
153 let transaction = Arc::clone(&tx);
154 self.broadcast_event(
155 tx.hash(),
156 TransactionEvent::Replaced(replaced_by),
157 FullTransactionEvent::Replaced { transaction, replaced_by },
158 );
159 }
160
161 pub(crate) fn queued(&mut self, tx: &TxHash) {
163 self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
164 }
165
166 pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
168 let peers = Arc::new(peers);
169 self.broadcast_event(
170 tx,
171 TransactionEvent::Propagated(Arc::clone(&peers)),
172 FullTransactionEvent::Propagated(peers),
173 );
174 }
175
176 #[inline]
178 pub(crate) fn discarded_many(&mut self, discarded: &[Arc<ValidPoolTransaction<T>>]) {
179 if self.is_empty() {
180 return
181 }
182 for tx in discarded {
183 self.discarded(tx.hash());
184 }
185 }
186
187 pub(crate) fn discarded(&mut self, tx: &TxHash) {
189 self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
190 }
191
192 pub(crate) fn invalid(&mut self, tx: &TxHash) {
194 self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
195 }
196
197 pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
199 self.broadcast_event(
200 tx,
201 TransactionEvent::Mined(block_hash),
202 FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
203 );
204 }
205}
206
207#[derive(Debug)]
211struct AllPoolEventsBroadcaster<T: PoolTransaction> {
212 senders: Vec<Sender<FullTransactionEvent<T>>>,
214}
215
216impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
217 fn default() -> Self {
218 Self { senders: Vec::new() }
219 }
220}
221
222impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
223 fn broadcast(&mut self, event: FullTransactionEvent<T>) {
225 self.senders.retain(|sender| match sender.try_send(event.clone()) {
226 Ok(_) | Err(TrySendError::Full(_)) => true,
227 Err(TrySendError::Closed(_)) => false,
228 })
229 }
230
231 #[inline]
233 const fn is_empty(&self) -> bool {
234 self.senders.is_empty()
235 }
236}
237
238#[derive(Default, Debug)]
242struct PoolEventBroadcaster {
243 senders: Vec<UnboundedSender<TransactionEvent>>,
245}
246
247impl PoolEventBroadcaster {
248 const fn is_empty(&self) -> bool {
250 self.senders.is_empty()
251 }
252
253 fn broadcast(&mut self, event: TransactionEvent) {
255 self.senders.retain(|sender| sender.send(event.clone()).is_ok())
256 }
257}
258
259#[derive(Debug)]
261pub(crate) struct PendingTransactionHashListener {
262 pub(crate) sender: mpsc::Sender<TxHash>,
263 pub(crate) kind: TransactionListenerKind,
265}
266
267impl PendingTransactionHashListener {
268 pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
272 for tx_hash in hashes {
273 match self.sender.try_send(tx_hash) {
274 Ok(()) => {}
275 Err(err) => {
276 return if matches!(err, mpsc::error::TrySendError::Full(_)) {
277 debug!(
278 target: "txpool",
279 "[{:?}] failed to send pending tx; channel full",
280 tx_hash,
281 );
282 true
283 } else {
284 false
285 }
286 }
287 }
288 }
289 true
290 }
291}
292
293#[derive(Debug)]
295pub(crate) struct TransactionListener<T: PoolTransaction> {
296 pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
297 pub(crate) kind: TransactionListenerKind,
299}
300
301impl<T: PoolTransaction> TransactionListener<T> {
302 pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
306 self.send_all(std::iter::once(event))
307 }
308
309 pub(crate) fn send_all(
313 &self,
314 events: impl IntoIterator<Item = NewTransactionEvent<T>>,
315 ) -> bool {
316 for event in events {
317 match self.sender.try_send(event) {
318 Ok(()) => {}
319 Err(err) => {
320 return if let mpsc::error::TrySendError::Full(event) = err {
321 debug!(
322 target: "txpool",
323 "[{:?}] failed to send pending tx; channel full",
324 event.transaction.hash(),
325 );
326 true
327 } else {
328 false
329 }
330 }
331 }
332 }
333 true
334 }
335}
336
337#[derive(Debug)]
339pub(crate) struct BlobTransactionSidecarListener {
340 pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
341}
342
343#[derive(Debug, Copy, Clone, PartialEq, Eq)]
347pub enum TransactionListenerKind {
348 All,
350 PropagateOnly,
354}
355
356impl TransactionListenerKind {
357 #[inline]
359 pub const fn is_propagate_only(&self) -> bool {
360 matches!(self, Self::PropagateOnly)
361 }
362}