reth_transaction_pool/
batcher.rs1use crate::{
7 error::PoolError, AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
8};
9use pin_project::pin_project;
10use std::{
11 future::Future,
12 pin::Pin,
13 task::{ready, Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16
17#[derive(Debug)]
21pub struct BatchTxRequest<T: PoolTransaction> {
22 pool_tx: T,
24 response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
26}
27
28impl<T> BatchTxRequest<T>
29where
30 T: PoolTransaction,
31{
32 pub const fn new(
34 pool_tx: T,
35 response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
36 ) -> Self {
37 Self { pool_tx, response_tx }
38 }
39}
40
41#[pin_project]
43#[derive(Debug)]
44pub struct BatchTxProcessor<Pool: TransactionPool> {
45 pool: Pool,
46 max_batch_size: usize,
47 buf: Vec<BatchTxRequest<Pool::Transaction>>,
48 #[pin]
49 request_rx: mpsc::UnboundedReceiver<BatchTxRequest<Pool::Transaction>>,
50}
51
52impl<Pool> BatchTxProcessor<Pool>
53where
54 Pool: TransactionPool + 'static,
55{
56 pub fn new(
58 pool: Pool,
59 max_batch_size: usize,
60 ) -> (Self, mpsc::UnboundedSender<BatchTxRequest<Pool::Transaction>>) {
61 let (request_tx, request_rx) = mpsc::unbounded_channel();
62
63 let processor = Self { pool, max_batch_size, buf: Vec::with_capacity(1), request_rx };
64
65 (processor, request_tx)
66 }
67
68 async fn process_request(pool: &Pool, req: BatchTxRequest<Pool::Transaction>) {
69 let BatchTxRequest { pool_tx, response_tx } = req;
70 let pool_result = pool.add_transaction(TransactionOrigin::Local, pool_tx).await;
71 let _ = response_tx.send(pool_result);
72 }
73
74 async fn process_batch(pool: &Pool, mut batch: Vec<BatchTxRequest<Pool::Transaction>>) {
76 if batch.len() == 1 {
77 Self::process_request(pool, batch.remove(0)).await;
78 return
79 }
80
81 let (pool_transactions, response_tx): (Vec<_>, Vec<_>) =
82 batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip();
83
84 let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await;
85
86 for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) {
87 let _ = response_tx.send(pool_result);
88 }
89 }
90}
91
92impl<Pool> Future for BatchTxProcessor<Pool>
93where
94 Pool: TransactionPool + 'static,
95{
96 type Output = ();
97
98 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99 let mut this = self.project();
100
101 loop {
102 ready!(this.request_rx.poll_recv_many(cx, this.buf, *this.max_batch_size));
104
105 if !this.buf.is_empty() {
106 let batch = std::mem::take(this.buf);
107 let pool = this.pool.clone();
108 tokio::spawn(async move {
109 Self::process_batch(&pool, batch).await;
110 });
111 this.buf.reserve(1);
112
113 continue;
114 }
115
116 return Poll::Pending;
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use crate::test_utils::{testing_pool, MockTransaction};
126 use futures::stream::{FuturesUnordered, StreamExt};
127 use std::time::Duration;
128 use tokio::time::timeout;
129
130 #[tokio::test]
131 async fn test_process_batch() {
132 let pool = testing_pool();
133
134 let mut batch_requests = Vec::new();
135 let mut responses = Vec::new();
136
137 for i in 0..100 {
138 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
139 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
140
141 batch_requests.push(BatchTxRequest::new(tx, response_tx));
142 responses.push(response_rx);
143 }
144
145 BatchTxProcessor::process_batch(&pool, batch_requests).await;
146
147 for response_rx in responses {
148 let result = timeout(Duration::from_millis(5), response_rx)
149 .await
150 .expect("Timeout waiting for response")
151 .expect("Response channel was closed unexpectedly");
152 assert!(result.is_ok());
153 }
154 }
155
156 #[tokio::test]
157 async fn test_batch_processor() {
158 let pool = testing_pool();
159 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
160
161 let handle = tokio::spawn(processor);
163
164 let mut responses = Vec::new();
165
166 for i in 0..50 {
167 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
168 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
169
170 request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx");
171 responses.push(response_rx);
172 }
173
174 tokio::time::sleep(Duration::from_millis(10)).await;
175
176 for rx in responses {
177 let result = timeout(Duration::from_millis(10), rx)
178 .await
179 .expect("Timeout waiting for response")
180 .expect("Response channel was closed unexpectedly");
181 assert!(result.is_ok());
182 }
183
184 drop(request_tx);
185 handle.abort();
186 }
187
188 #[tokio::test]
189 async fn test_add_transaction() {
190 let pool = testing_pool();
191 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
192
193 let handle = tokio::spawn(processor);
195
196 let mut results = Vec::new();
197 for i in 0..10 {
198 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
199 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
200 let request = BatchTxRequest::new(tx, response_tx);
201 request_tx.send(request).expect("Could not send batch tx");
202 results.push(response_rx);
203 }
204
205 for res in results {
206 let result = timeout(Duration::from_millis(10), res)
207 .await
208 .expect("Timeout waiting for transaction result");
209 assert!(result.is_ok());
210 }
211
212 handle.abort();
213 }
214
215 #[tokio::test]
216 async fn test_max_batch_size() {
217 let pool = testing_pool();
218 let max_batch_size = 10;
219 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), max_batch_size);
220
221 let handle = tokio::spawn(processor);
223
224 let mut futures = FuturesUnordered::new();
225 for i in 0..max_batch_size {
226 let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
227 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
228 let request = BatchTxRequest::new(tx, response_tx);
229 let request_tx_clone = request_tx.clone();
230
231 let tx_fut = async move {
232 request_tx_clone.send(request).expect("Could not send batch tx");
233 response_rx.await.expect("Could not receive batch response")
234 };
235 futures.push(tx_fut);
236 }
237
238 while let Some(result) = timeout(Duration::from_millis(5), futures.next())
239 .await
240 .expect("Timeout waiting for transaction result")
241 {
242 assert!(result.is_ok());
243 }
244
245 handle.abort();
246 }
247}