reth_transaction_pool/
batcher.rs1use crate::{
7 error::PoolError, AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
8};
9use pin_project::pin_project;
10use std::{
11 future::Future,
12 pin::Pin,
13 task::{Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16
17#[derive(Debug)]
21pub struct BatchTxRequest<T: PoolTransaction> {
22 pool_tx: T,
24 response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
26}
27
28impl<T> BatchTxRequest<T>
29where
30 T: PoolTransaction,
31{
32 pub const fn new(
34 pool_tx: T,
35 response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
36 ) -> Self {
37 Self { pool_tx, response_tx }
38 }
39}
40
41#[pin_project]
43#[derive(Debug)]
44pub struct BatchTxProcessor<Pool: TransactionPool> {
45 pool: Pool,
46 max_batch_size: usize,
47 #[pin]
48 request_rx: mpsc::UnboundedReceiver<BatchTxRequest<Pool::Transaction>>,
49}
50
51impl<Pool> BatchTxProcessor<Pool>
52where
53 Pool: TransactionPool + 'static,
54{
55 pub fn new(
57 pool: Pool,
58 max_batch_size: usize,
59 ) -> (Self, mpsc::UnboundedSender<BatchTxRequest<Pool::Transaction>>) {
60 let (request_tx, request_rx) = mpsc::unbounded_channel();
61
62 let processor = Self { pool, max_batch_size, request_rx };
63
64 (processor, request_tx)
65 }
66
67 async fn process_batch(pool: &Pool, batch: Vec<BatchTxRequest<Pool::Transaction>>) {
69 let (pool_transactions, response_tx): (Vec<_>, Vec<_>) =
70 batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip();
71
72 let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await;
73
74 for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) {
75 let _ = response_tx.send(pool_result);
76 }
77 }
78}
79
80impl<Pool> Future for BatchTxProcessor<Pool>
81where
82 Pool: TransactionPool + 'static,
83{
84 type Output = ();
85
86 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
87 let mut this = self.project();
88
89 loop {
90 let mut batch = Vec::with_capacity(1);
92 while let Poll::Ready(Some(request)) = this.request_rx.poll_recv(cx) {
93 batch.push(request);
94
95 if batch.len() >= *this.max_batch_size {
97 break;
98 }
99 }
100
101 if !batch.is_empty() {
102 let pool = this.pool.clone();
103 tokio::spawn(async move {
104 Self::process_batch(&pool, batch).await;
105 });
106
107 continue;
108 }
109
110 return Poll::Pending;
112 }
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use crate::test_utils::{testing_pool, MockTransaction};
120 use futures::stream::{FuturesUnordered, StreamExt};
121 use std::time::Duration;
122 use tokio::time::timeout;
123
124 #[tokio::test]
125 async fn test_process_batch() {
126 let pool = testing_pool();
127
128 let mut batch_requests = Vec::new();
129 let mut responses = Vec::new();
130
131 for i in 0..100 {
132 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
133 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
134
135 batch_requests.push(BatchTxRequest::new(tx, response_tx));
136 responses.push(response_rx);
137 }
138
139 BatchTxProcessor::process_batch(&pool, batch_requests).await;
140
141 for response_rx in responses {
142 let result = timeout(Duration::from_millis(5), response_rx)
143 .await
144 .expect("Timeout waiting for response")
145 .expect("Response channel was closed unexpectedly");
146 assert!(result.is_ok());
147 }
148 }
149
150 #[tokio::test]
151 async fn test_batch_processor() {
152 let pool = testing_pool();
153 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
154
155 let handle = tokio::spawn(processor);
157
158 let mut responses = Vec::new();
159
160 for i in 0..50 {
161 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
162 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
163
164 request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx");
165 responses.push(response_rx);
166 }
167
168 tokio::time::sleep(Duration::from_millis(10)).await;
169
170 for rx in responses {
171 let result = timeout(Duration::from_millis(10), rx)
172 .await
173 .expect("Timeout waiting for response")
174 .expect("Response channel was closed unexpectedly");
175 assert!(result.is_ok());
176 }
177
178 drop(request_tx);
179 handle.abort();
180 }
181
182 #[tokio::test]
183 async fn test_add_transaction() {
184 let pool = testing_pool();
185 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
186
187 let handle = tokio::spawn(processor);
189
190 let mut results = Vec::new();
191 for i in 0..10 {
192 let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
193 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
194 let request = BatchTxRequest::new(tx, response_tx);
195 request_tx.send(request).expect("Could not send batch tx");
196 results.push(response_rx);
197 }
198
199 for res in results {
200 let result = timeout(Duration::from_millis(10), res)
201 .await
202 .expect("Timeout waiting for transaction result");
203 assert!(result.is_ok());
204 }
205
206 handle.abort();
207 }
208
209 #[tokio::test]
210 async fn test_max_batch_size() {
211 let pool = testing_pool();
212 let max_batch_size = 10;
213 let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), max_batch_size);
214
215 let handle = tokio::spawn(processor);
217
218 let mut futures = FuturesUnordered::new();
219 for i in 0..max_batch_size {
220 let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
221 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
222 let request = BatchTxRequest::new(tx, response_tx);
223 let request_tx_clone = request_tx.clone();
224
225 let tx_fut = async move {
226 request_tx_clone.send(request).expect("Could not send batch tx");
227 response_rx.await.expect("Could not receive batch response")
228 };
229 futures.push(tx_fut);
230 }
231
232 while let Some(result) = timeout(Duration::from_millis(5), futures.next())
233 .await
234 .expect("Timeout waiting for transaction result")
235 {
236 assert!(result.is_ok());
237 }
238
239 handle.abort();
240 }
241}