reth_transaction_pool/
batcher.rs1use crate::{
7 error::PoolError, AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
8};
9use pin_project::pin_project;
10use std::{
11 future::Future,
12 pin::Pin,
13 task::{ready, Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16
17#[derive(Debug)]
19pub struct BatchTxRequest<T: PoolTransaction> {
20 origin: TransactionOrigin,
22 pool_tx: T,
24 response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
26}
27
28impl<T> BatchTxRequest<T>
29where
30 T: PoolTransaction,
31{
32 pub const fn new(
34 origin: TransactionOrigin,
35 pool_tx: T,
36 response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
37 ) -> Self {
38 Self { origin, pool_tx, response_tx }
39 }
40}
41
42#[pin_project]
44#[derive(Debug)]
45pub struct BatchTxProcessor<Pool: TransactionPool> {
46 pool: Pool,
47 max_batch_size: usize,
48 buf: Vec<BatchTxRequest<Pool::Transaction>>,
49 #[pin]
50 request_rx: mpsc::UnboundedReceiver<BatchTxRequest<Pool::Transaction>>,
51}
52
53impl<Pool> BatchTxProcessor<Pool>
54where
55 Pool: TransactionPool + 'static,
56{
57 pub fn new(
59 pool: Pool,
60 max_batch_size: usize,
61 ) -> (Self, mpsc::UnboundedSender<BatchTxRequest<Pool::Transaction>>) {
62 let (request_tx, request_rx) = mpsc::unbounded_channel();
63
64 let processor = Self { pool, max_batch_size, buf: Vec::with_capacity(1), request_rx };
65
66 (processor, request_tx)
67 }
68
69 async fn process_request(pool: &Pool, req: BatchTxRequest<Pool::Transaction>) {
70 let BatchTxRequest { origin, pool_tx, response_tx } = req;
71 let pool_result = pool.add_transaction(origin, pool_tx).await;
72 let _ = response_tx.send(pool_result);
73 }
74
75 async fn process_batch(pool: &Pool, batch: Vec<BatchTxRequest<Pool::Transaction>>) {
77 if batch.len() == 1 {
78 Self::process_request(pool, batch.into_iter().next().expect("batch is not empty"))
79 .await;
80 return
81 }
82
83 let (transactions, response_txs): (Vec<_>, Vec<_>) =
84 batch.into_iter().map(|req| ((req.origin, req.pool_tx), req.response_tx)).unzip();
85
86 let pool_results = pool.add_transactions_with_origins(transactions).await;
87 for (response_tx, pool_result) in response_txs.into_iter().zip(pool_results) {
88 let _ = response_tx.send(pool_result);
89 }
90 }
91}
92
93impl<Pool> Future for BatchTxProcessor<Pool>
94where
95 Pool: TransactionPool + 'static,
96{
97 type Output = ();
98
99 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
100 let mut this = self.project();
101
102 loop {
103 ready!(this.request_rx.poll_recv_many(cx, this.buf, *this.max_batch_size));
105
106 if !this.buf.is_empty() {
107 let batch = std::mem::take(this.buf);
108 let pool = this.pool.clone();
109 tokio::spawn(async move {
110 Self::process_batch(&pool, batch).await;
111 });
112 this.buf.reserve(1);
113
114 continue;
115 }
116
117 return Poll::Pending;
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::test_utils::{testing_pool, MockTransaction};
127 use futures::stream::{FuturesUnordered, StreamExt};
128 use std::time::Duration;
129 use tokio::time::timeout;
130
131 #[tokio::test]
132 async fn test_process_batch() {
133 let pool = testing_pool();
134
135 let mut batch_requests = Vec::new();
136 let mut responses = Vec::new();
137
138 for i in 0..100 {
139 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
140 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
141
142 batch_requests.push(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx));
143 responses.push(response_rx);
144 }
145
146 BatchTxProcessor::process_batch(&pool, batch_requests).await;
147
148 for response_rx in responses {
149 let result = timeout(Duration::from_millis(5), response_rx)
150 .await
151 .expect("Timeout waiting for response")
152 .expect("Response channel was closed unexpectedly");
153 assert!(result.is_ok());
154 }
155 }
156
157 #[tokio::test]
158 async fn test_batch_processor() {
159 let pool = testing_pool();
160 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
161
162 let handle = tokio::spawn(processor);
164
165 let mut responses = Vec::new();
166
167 for i in 0..50 {
168 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
169 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
170
171 request_tx
172 .send(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx))
173 .expect("Could not send batch tx");
174 responses.push(response_rx);
175 }
176
177 tokio::time::sleep(Duration::from_millis(10)).await;
178
179 for rx in responses {
180 let result = timeout(Duration::from_millis(10), rx)
181 .await
182 .expect("Timeout waiting for response")
183 .expect("Response channel was closed unexpectedly");
184 assert!(result.is_ok());
185 }
186
187 drop(request_tx);
188 handle.abort();
189 }
190
191 #[tokio::test]
192 async fn test_add_transaction() {
193 let pool = testing_pool();
194 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
195
196 let handle = tokio::spawn(processor);
198
199 let mut results = Vec::new();
200 for i in 0..10 {
201 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
202 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
203 let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
204 request_tx.send(request).expect("Could not send batch tx");
205 results.push(response_rx);
206 }
207
208 for res in results {
209 let result = timeout(Duration::from_millis(10), res)
210 .await
211 .expect("Timeout waiting for transaction result");
212 assert!(result.is_ok());
213 }
214
215 handle.abort();
216 }
217
218 #[tokio::test]
219 async fn test_max_batch_size() {
220 let pool = testing_pool();
221 let max_batch_size = 10;
222 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), max_batch_size);
223
224 let handle = tokio::spawn(processor);
226
227 let mut futures = FuturesUnordered::new();
228 for i in 0..max_batch_size {
229 let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
230 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
231 let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
232 let request_tx_clone = request_tx.clone();
233
234 let tx_fut = async move {
235 request_tx_clone.send(request).expect("Could not send batch tx");
236 response_rx.await.expect("Could not receive batch response")
237 };
238 futures.push(tx_fut);
239 }
240
241 while let Some(result) = timeout(Duration::from_millis(5), futures.next())
242 .await
243 .expect("Timeout waiting for transaction result")
244 {
245 assert!(result.is_ok());
246 }
247
248 handle.abort();
249 }
250}