Skip to main content

reth_transaction_pool/
batcher.rs

1//! Transaction batching for `Pool` insertion for high-throughput scenarios
2//!
3//! This module provides transaction batching logic to reduce lock contention when processing
4//! many concurrent transaction pool insertions.
5
6use crate::{
7    error::PoolError, AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
8};
9use pin_project::pin_project;
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{ready, Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16
17/// A single batch transaction request
18#[derive(Debug)]
19pub struct BatchTxRequest<T: PoolTransaction> {
20    /// Origin of the transaction (e.g. Local, External)
21    origin: TransactionOrigin,
22    /// Tx to be inserted in to the pool
23    pool_tx: T,
24    /// Channel to send result back to caller
25    response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
26}
27
28impl<T> BatchTxRequest<T>
29where
30    T: PoolTransaction,
31{
32    /// Create a new batch transaction request
33    pub const fn new(
34        origin: TransactionOrigin,
35        pool_tx: T,
36        response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
37    ) -> Self {
38        Self { origin, pool_tx, response_tx }
39    }
40}
41
42/// Transaction batch processor that handles batch processing
43#[pin_project]
44#[derive(Debug)]
45pub struct BatchTxProcessor<Pool: TransactionPool> {
46    pool: Pool,
47    max_batch_size: usize,
48    buf: Vec<BatchTxRequest<Pool::Transaction>>,
49    #[pin]
50    request_rx: mpsc::UnboundedReceiver<BatchTxRequest<Pool::Transaction>>,
51}
52
53impl<Pool> BatchTxProcessor<Pool>
54where
55    Pool: TransactionPool + 'static,
56{
57    /// Create a new `BatchTxProcessor`
58    pub fn new(
59        pool: Pool,
60        max_batch_size: usize,
61    ) -> (Self, mpsc::UnboundedSender<BatchTxRequest<Pool::Transaction>>) {
62        let (request_tx, request_rx) = mpsc::unbounded_channel();
63
64        let processor = Self { pool, max_batch_size, buf: Vec::with_capacity(1), request_rx };
65
66        (processor, request_tx)
67    }
68
69    async fn process_request(pool: &Pool, req: BatchTxRequest<Pool::Transaction>) {
70        let BatchTxRequest { origin, pool_tx, response_tx } = req;
71        let pool_result = pool.add_transaction(origin, pool_tx).await;
72        let _ = response_tx.send(pool_result);
73    }
74
75    /// Process a batch of transaction requests with per-transaction origins
76    async fn process_batch(pool: &Pool, batch: Vec<BatchTxRequest<Pool::Transaction>>) {
77        if batch.len() == 1 {
78            Self::process_request(pool, batch.into_iter().next().expect("batch is not empty"))
79                .await;
80            return
81        }
82
83        let (transactions, response_txs): (Vec<_>, Vec<_>) =
84            batch.into_iter().map(|req| ((req.origin, req.pool_tx), req.response_tx)).unzip();
85
86        let pool_results = pool.add_transactions_with_origins(transactions).await;
87        for (response_tx, pool_result) in response_txs.into_iter().zip(pool_results) {
88            let _ = response_tx.send(pool_result);
89        }
90    }
91}
92
93impl<Pool> Future for BatchTxProcessor<Pool>
94where
95    Pool: TransactionPool + 'static,
96{
97    type Output = ();
98
99    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
100        let mut this = self.project();
101
102        loop {
103            // Drain all available requests from the receiver
104            ready!(this.request_rx.poll_recv_many(cx, this.buf, *this.max_batch_size));
105
106            if !this.buf.is_empty() {
107                let batch = std::mem::take(this.buf);
108                let pool = this.pool.clone();
109                tokio::spawn(async move {
110                    Self::process_batch(&pool, batch).await;
111                });
112                this.buf.reserve(1);
113
114                continue;
115            }
116
117            // No requests available, return Pending to wait for more
118            return Poll::Pending;
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::test_utils::{testing_pool, MockTransaction};
127    use futures::stream::{FuturesUnordered, StreamExt};
128    use std::time::Duration;
129    use tokio::time::timeout;
130
131    #[tokio::test]
132    async fn test_process_batch() {
133        let pool = testing_pool();
134
135        let mut batch_requests = Vec::new();
136        let mut responses = Vec::new();
137
138        for i in 0..100 {
139            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
140            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
141
142            batch_requests.push(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx));
143            responses.push(response_rx);
144        }
145
146        BatchTxProcessor::process_batch(&pool, batch_requests).await;
147
148        for response_rx in responses {
149            let result = timeout(Duration::from_millis(5), response_rx)
150                .await
151                .expect("Timeout waiting for response")
152                .expect("Response channel was closed unexpectedly");
153            assert!(result.is_ok());
154        }
155    }
156
157    #[tokio::test]
158    async fn test_batch_processor() {
159        let pool = testing_pool();
160        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
161
162        // Spawn the processor
163        let handle = tokio::spawn(processor);
164
165        let mut responses = Vec::new();
166
167        for i in 0..50 {
168            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
169            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
170
171            request_tx
172                .send(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx))
173                .expect("Could not send batch tx");
174            responses.push(response_rx);
175        }
176
177        tokio::time::sleep(Duration::from_millis(10)).await;
178
179        for rx in responses {
180            let result = timeout(Duration::from_millis(10), rx)
181                .await
182                .expect("Timeout waiting for response")
183                .expect("Response channel was closed unexpectedly");
184            assert!(result.is_ok());
185        }
186
187        drop(request_tx);
188        handle.abort();
189    }
190
191    #[tokio::test]
192    async fn test_add_transaction() {
193        let pool = testing_pool();
194        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
195
196        // Spawn the processor
197        let handle = tokio::spawn(processor);
198
199        let mut results = Vec::new();
200        for i in 0..10 {
201            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
202            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
203            let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
204            request_tx.send(request).expect("Could not send batch tx");
205            results.push(response_rx);
206        }
207
208        for res in results {
209            let result = timeout(Duration::from_millis(10), res)
210                .await
211                .expect("Timeout waiting for transaction result");
212            assert!(result.is_ok());
213        }
214
215        handle.abort();
216    }
217
218    #[tokio::test]
219    async fn test_max_batch_size() {
220        let pool = testing_pool();
221        let max_batch_size = 10;
222        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), max_batch_size);
223
224        // Spawn batch processor with threshold
225        let handle = tokio::spawn(processor);
226
227        let mut futures = FuturesUnordered::new();
228        for i in 0..max_batch_size {
229            let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
230            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
231            let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
232            let request_tx_clone = request_tx.clone();
233
234            let tx_fut = async move {
235                request_tx_clone.send(request).expect("Could not send batch tx");
236                response_rx.await.expect("Could not receive batch response")
237            };
238            futures.push(tx_fut);
239        }
240
241        while let Some(result) = timeout(Duration::from_millis(5), futures.next())
242            .await
243            .expect("Timeout waiting for transaction result")
244        {
245            assert!(result.is_ok());
246        }
247
248        handle.abort();
249    }
250}