reth_transaction_pool/pool/
listener.rs
1use crate::{
4 pool::events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
5 traits::{NewBlobSidecar, PropagateKind},
6 PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11 collections::{hash_map::Entry, HashMap},
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17 self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19use tracing::debug;
20const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27 hash: TxHash,
28 events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32 pub const fn hash(&self) -> TxHash {
34 self.hash
35 }
36}
37
38impl Stream for TransactionEvents {
39 type Item = TransactionEvent;
40
41 fn poll_next(
42 self: std::pin::Pin<&mut Self>,
43 cx: &mut std::task::Context<'_>,
44 ) -> std::task::Poll<Option<Self::Item>> {
45 self.get_mut().events.poll_recv(cx)
46 }
47}
48
49#[derive(Debug)]
51#[must_use = "streams do nothing unless polled"]
52pub struct AllTransactionsEvents<T: PoolTransaction> {
53 pub(crate) events: Receiver<FullTransactionEvent<T>>,
54}
55
56impl<T: PoolTransaction> AllTransactionsEvents<T> {
57 pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
59 Self { events }
60 }
61}
62
63impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
64 type Item = FullTransactionEvent<T>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 self.get_mut().events.poll_recv(cx)
68 }
69}
70
71#[derive(Debug)]
76pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
77 all_events_broadcaster: AllPoolEventsBroadcaster<T>,
79 broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
81}
82
83impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
84 fn default() -> Self {
85 Self {
86 all_events_broadcaster: AllPoolEventsBroadcaster::default(),
87 broadcasters_by_hash: HashMap::default(),
88 }
89 }
90}
91
92impl<T: PoolTransaction> PoolEventBroadcast<T> {
93 fn broadcast_event(
95 &mut self,
96 hash: &TxHash,
97 event: TransactionEvent,
98 pool_event: FullTransactionEvent<T>,
99 ) {
100 if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
102 sink.get_mut().broadcast(event.clone());
103
104 if sink.get().is_empty() || event.is_final() {
105 sink.remove();
106 }
107 }
108
109 self.all_events_broadcaster.broadcast(pool_event);
111 }
112
113 pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
115 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
116
117 match self.broadcasters_by_hash.entry(tx_hash) {
118 Entry::Occupied(mut entry) => {
119 entry.get_mut().senders.push(tx);
120 }
121 Entry::Vacant(entry) => {
122 entry.insert(PoolEventBroadcaster { senders: vec![tx] });
123 }
124 };
125 TransactionEvents { hash: tx_hash, events: rx }
126 }
127
128 pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
130 let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
131 self.all_events_broadcaster.senders.push(tx);
132 AllTransactionsEvents::new(rx)
133 }
134
135 pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
137 self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
138
139 if let Some(replaced) = replaced {
140 self.replaced(replaced, *tx);
142 }
143 }
144
145 pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
147 let transaction = Arc::clone(&tx);
148 self.broadcast_event(
149 tx.hash(),
150 TransactionEvent::Replaced(replaced_by),
151 FullTransactionEvent::Replaced { transaction, replaced_by },
152 );
153 }
154
155 pub(crate) fn queued(&mut self, tx: &TxHash) {
157 self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
158 }
159
160 pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
162 let peers = Arc::new(peers);
163 self.broadcast_event(
164 tx,
165 TransactionEvent::Propagated(Arc::clone(&peers)),
166 FullTransactionEvent::Propagated(peers),
167 );
168 }
169
170 pub(crate) fn discarded(&mut self, tx: &TxHash) {
172 self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
173 }
174
175 pub(crate) fn invalid(&mut self, tx: &TxHash) {
177 self.broadcast_event(tx, TransactionEvent::Invalid, FullTransactionEvent::Invalid(*tx));
178 }
179
180 pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
182 self.broadcast_event(
183 tx,
184 TransactionEvent::Mined(block_hash),
185 FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
186 );
187 }
188}
189
190#[derive(Debug)]
194struct AllPoolEventsBroadcaster<T: PoolTransaction> {
195 senders: Vec<Sender<FullTransactionEvent<T>>>,
197}
198
199impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
200 fn default() -> Self {
201 Self { senders: Vec::new() }
202 }
203}
204
205impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
206 fn broadcast(&mut self, event: FullTransactionEvent<T>) {
208 self.senders.retain(|sender| match sender.try_send(event.clone()) {
209 Ok(_) | Err(TrySendError::Full(_)) => true,
210 Err(TrySendError::Closed(_)) => false,
211 })
212 }
213}
214
215#[derive(Default, Debug)]
219struct PoolEventBroadcaster {
220 senders: Vec<UnboundedSender<TransactionEvent>>,
222}
223
224impl PoolEventBroadcaster {
225 fn is_empty(&self) -> bool {
227 self.senders.is_empty()
228 }
229
230 fn broadcast(&mut self, event: TransactionEvent) {
232 self.senders.retain(|sender| sender.send(event.clone()).is_ok())
233 }
234}
235
236#[derive(Debug)]
238pub(crate) struct PendingTransactionHashListener {
239 pub(crate) sender: mpsc::Sender<TxHash>,
240 pub(crate) kind: TransactionListenerKind,
242}
243
244impl PendingTransactionHashListener {
245 pub(crate) fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
249 for tx_hash in hashes {
250 match self.sender.try_send(tx_hash) {
251 Ok(()) => {}
252 Err(err) => {
253 return if matches!(err, mpsc::error::TrySendError::Full(_)) {
254 debug!(
255 target: "txpool",
256 "[{:?}] failed to send pending tx; channel full",
257 tx_hash,
258 );
259 true
260 } else {
261 false
262 }
263 }
264 }
265 }
266 true
267 }
268}
269
270#[derive(Debug)]
272pub(crate) struct TransactionListener<T: PoolTransaction> {
273 pub(crate) sender: mpsc::Sender<NewTransactionEvent<T>>,
274 pub(crate) kind: TransactionListenerKind,
276}
277
278impl<T: PoolTransaction> TransactionListener<T> {
279 pub(crate) fn send(&self, event: NewTransactionEvent<T>) -> bool {
283 self.send_all(std::iter::once(event))
284 }
285
286 pub(crate) fn send_all(
290 &self,
291 events: impl IntoIterator<Item = NewTransactionEvent<T>>,
292 ) -> bool {
293 for event in events {
294 match self.sender.try_send(event) {
295 Ok(()) => {}
296 Err(err) => {
297 return if let mpsc::error::TrySendError::Full(event) = err {
298 debug!(
299 target: "txpool",
300 "[{:?}] failed to send pending tx; channel full",
301 event.transaction.hash(),
302 );
303 true
304 } else {
305 false
306 }
307 }
308 }
309 }
310 true
311 }
312}
313
314#[derive(Debug)]
316pub(crate) struct BlobTransactionSidecarListener {
317 pub(crate) sender: mpsc::Sender<NewBlobSidecar>,
318}
319
320#[derive(Debug, Copy, Clone, PartialEq, Eq)]
324pub enum TransactionListenerKind {
325 All,
327 PropagateOnly,
331}
332
333impl TransactionListenerKind {
334 #[inline]
336 pub const fn is_propagate_only(&self) -> bool {
337 matches!(self, Self::PropagateOnly)
338 }
339}