reth_transaction_pool/pool/
listener.rs1use crate::{
4 pool::events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
5 traits::{NewBlobSidecar, PropagateKind},
6 PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11 collections::{hash_map::Entry, HashMap},
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17 self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19use tracing::debug;
20const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27 hash: TxHash,
28 events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32 pub const fn new(hash: TxHash, events: UnboundedReceiver<TransactionEvent>) -> Self {
34 Self { hash, events }
35 }
36
37 pub const fn hash(&self) -> TxHash {
39 self.hash
40 }
41}
42
43impl Stream for TransactionEvents {
44 type Item = TransactionEvent;
45
46 fn poll_next(
47 self: std::pin::Pin<&mut Self>,
48 cx: &mut std::task::Context<'_>,
49 ) -> std::task::Poll<Option<Self::Item>> {
50 self.get_mut().events.poll_recv(cx)
51 }
52}
53
54#[derive(Debug)]
56#[must_use = "streams do nothing unless polled"]
57pub struct AllTransactionsEvents<T: PoolTransaction> {
58 pub(crate) events: Receiver<FullTransactionEvent<T>>,
59}
60
61impl<T: PoolTransaction> AllTransactionsEvents<T> {
62 pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
64 Self { events }
65 }
66}
67
68impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
69 type Item = FullTransactionEvent<T>;
70
71 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72 self.get_mut().events.poll_recv(cx)
73 }
74}
75
76#[derive(Debug)]
81pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
82 all_events_broadcaster: AllPoolEventsBroadcaster<T>,
84 broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
86}
87
88impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
89 fn default() -> Self {
90 Self {
91 all_events_broadcaster: AllPoolEventsBroadcaster::default(),
92 broadcasters_by_hash: HashMap::default(),
93 }
94 }
95}
96
97impl<T: PoolTransaction> PoolEventBroadcast<T> {
98 fn broadcast_event(
100 &mut self,
101 hash: &TxHash,
102 event: TransactionEvent,
103 pool_event: FullTransactionEvent<T>,
104 ) {
105 if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
107 sink.get_mut().broadcast(event.clone());
108
109 if sink.get().is_empty() || event.is_final() {
110 sink.remove();
111 }
112 }
113
114 self.all_events_broadcaster.broadcast(pool_event);
116 }
117
118 #[inline]
120 pub(crate) fn is_empty(&self) -> bool {
121 self.all_events_broadcaster.is_empty() && self.broadcasters_by_hash.is_empty()
122 }
123
124 pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
126 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
127
128 match self.broadcasters_by_hash.entry(tx_hash) {
129 Entry::Occupied(mut entry) => {
130 entry.get_mut().senders.push(tx);
131 }
132 Entry::Vacant(entry) => {
133 entry.insert(PoolEventBroadcaster { senders: vec![tx] });
134 }
135 };
136 TransactionEvents { hash: tx_hash, events: rx }
137 }
138
139 pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
141 let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
142 self.all_events_broadcaster.senders.push(tx);
143 AllTransactionsEvents::new(rx)
144 }
145
146 pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
148 self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
149
150 if let Some(replaced) = replaced {
151 self.replaced(replaced, *tx);
153 }
154 }
155
156 pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
158 let transaction = Arc::clone(&tx);
159 self.broadcast_event(
160 tx.hash(),
161 TransactionEvent::Replaced(replaced_by),
162 FullTransactionEvent::Replaced { transaction, replaced_by },
163 );
164 }
165
166 pub(crate) fn queued(&mut self, tx: &TxHash) {
168 self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
169 }
170
171 pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
173 let peers = Arc::new(peers);
174 self.broadcast_event(
175 tx,
176 TransactionEvent::Propagated(Arc::clone(&peers)),
177 FullTransactionEvent::Propagated(peers),
178 );
179 }
180
181 #[inline]
183 pub(crate) fn discarded_many(&mut self, discarded: &[Arc<ValidPoolTransaction<T>>]) {
184 if self.is_empty() {
185 return
186 }
187 for tx in discarded {
188 self.discarded(tx.hash());
189 }
190 }
191
192 pub(crate) fn discarded(&mut self, tx: &TxHash) {
194 self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
195 }
196
197 pub(crate) fn invalid(&mut self, tx: &TxHash) {
199 self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
200 }
201
202 pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
204 self.broadcast_event(
205 tx,
206 TransactionEvent::Mined(block_hash),
207 FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
208 );
209 }
210}
211
212#[derive(Debug)]
216struct AllPoolEventsBroadcaster<T: PoolTransaction> {
217 senders: Vec<Sender<FullTransactionEvent<T>>>,
219}
220
221impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
222 fn default() -> Self {
223 Self { senders: Vec::new() }
224 }
225}
226
227impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
228 fn broadcast(&mut self, event: FullTransactionEvent<T>) {
230 self.senders.retain(|sender| match sender.try_send(event.clone()) {
231 Ok(_) | Err(TrySendError::Full(_)) => true,
232 Err(TrySendError::Closed(_)) => false,
233 })
234 }
235
236 #[inline]
238 const fn is_empty(&self) -> bool {
239 self.senders.is_empty()
240 }
241}
242
243#[derive(Default, Debug)]
247struct PoolEventBroadcaster {
248 senders: Vec<UnboundedSender<TransactionEvent>>,
250}
251
252impl PoolEventBroadcaster {
253 const fn is_empty(&self) -> bool {
255 self.senders.is_empty()
256 }
257
258 fn broadcast(&mut self, event: TransactionEvent) {
260 self.senders.retain(|sender| sender.send(event.clone()).is_ok())
261 }
262}
263
264#[derive(Debug)]
266pub(crate) struct PendingTransactionHashListener {
267 pub(crate) sender: mpsc::Sender<TxHash>,
268 pub(crate) kind: TransactionListenerKind,
270}
271
272impl PendingTransactionHashListener {
273 pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
277 for tx_hash in hashes {
278 match self.sender.try_send(tx_hash) {
279 Ok(()) => {}
280 Err(err) => {
281 return if matches!(err, mpsc::error::TrySendError::Full(_)) {
282 debug!(
283 target: "txpool",
284 "[{:?}] failed to send pending tx; channel full",
285 tx_hash,
286 );
287 true
288 } else {
289 false
290 }
291 }
292 }
293 }
294 true
295 }
296}
297
298#[derive(Debug)]
300pub(crate) struct TransactionListener<T: PoolTransaction> {
301 pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
302 pub(crate) kind: TransactionListenerKind,
304}
305
306impl<T: PoolTransaction> TransactionListener<T> {
307 pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
311 self.send_all(std::iter::once(event))
312 }
313
314 pub(crate) fn send_all(
318 &self,
319 events: impl IntoIterator<Item = NewTransactionEvent<T>>,
320 ) -> bool {
321 for event in events {
322 match self.sender.try_send(event) {
323 Ok(()) => {}
324 Err(err) => {
325 return if let mpsc::error::TrySendError::Full(event) = err {
326 debug!(
327 target: "txpool",
328 "[{:?}] failed to send pending tx; channel full",
329 event.transaction.hash(),
330 );
331 true
332 } else {
333 false
334 }
335 }
336 }
337 }
338 true
339 }
340}
341
342#[derive(Debug)]
344pub(crate) struct BlobTransactionSidecarListener {
345 pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
346}
347
348#[derive(Debug, Copy, Clone, PartialEq, Eq)]
352pub enum TransactionListenerKind {
353 All,
355 PropagateOnly,
359}
360
361impl TransactionListenerKind {
362 #[inline]
364 pub const fn is_propagate_only(&self) -> bool {
365 matches!(self, Self::PropagateOnly)
366 }
367}