reth_transaction_pool/
batcher.rs

1//! Transaction batching for `Pool` insertion for high-throughput scenarios
2//!
3//! This module provides transaction batching logic to reduce lock contention when processing
4//! many concurrent transaction pool insertions.
5
6use crate::{
7    error::PoolError, AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
8};
9use pin_project::pin_project;
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{ready, Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16
17/// A single batch transaction request
18/// All transactions processed through the batcher are considered local
19/// transactions (`TransactionOrigin::Local`) when inserted into the pool.
20#[derive(Debug)]
21pub struct BatchTxRequest<T: PoolTransaction> {
22    /// Tx to be inserted in to the pool
23    pool_tx: T,
24    /// Channel to send result back to caller
25    response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
26}
27
28impl<T> BatchTxRequest<T>
29where
30    T: PoolTransaction,
31{
32    /// Create a new batch transaction request
33    pub const fn new(
34        pool_tx: T,
35        response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
36    ) -> Self {
37        Self { pool_tx, response_tx }
38    }
39}
40
41/// Transaction batch processor that handles batch processing
42#[pin_project]
43#[derive(Debug)]
44pub struct BatchTxProcessor<Pool: TransactionPool> {
45    pool: Pool,
46    max_batch_size: usize,
47    buf: Vec<BatchTxRequest<Pool::Transaction>>,
48    #[pin]
49    request_rx: mpsc::UnboundedReceiver<BatchTxRequest<Pool::Transaction>>,
50}
51
52impl<Pool> BatchTxProcessor<Pool>
53where
54    Pool: TransactionPool + 'static,
55{
56    /// Create a new `BatchTxProcessor`
57    pub fn new(
58        pool: Pool,
59        max_batch_size: usize,
60    ) -> (Self, mpsc::UnboundedSender<BatchTxRequest<Pool::Transaction>>) {
61        let (request_tx, request_rx) = mpsc::unbounded_channel();
62
63        let processor = Self { pool, max_batch_size, buf: Vec::with_capacity(1), request_rx };
64
65        (processor, request_tx)
66    }
67
68    async fn process_request(pool: &Pool, req: BatchTxRequest<Pool::Transaction>) {
69        let BatchTxRequest { pool_tx, response_tx } = req;
70        let pool_result = pool.add_transaction(TransactionOrigin::Local, pool_tx).await;
71        let _ = response_tx.send(pool_result);
72    }
73
74    /// Process a batch of transaction requests, grouped by origin
75    async fn process_batch(pool: &Pool, mut batch: Vec<BatchTxRequest<Pool::Transaction>>) {
76        if batch.len() == 1 {
77            Self::process_request(pool, batch.remove(0)).await;
78            return
79        }
80
81        let (pool_transactions, response_tx): (Vec<_>, Vec<_>) =
82            batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip();
83
84        let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await;
85
86        for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) {
87            let _ = response_tx.send(pool_result);
88        }
89    }
90}
91
92impl<Pool> Future for BatchTxProcessor<Pool>
93where
94    Pool: TransactionPool + 'static,
95{
96    type Output = ();
97
98    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99        let mut this = self.project();
100
101        loop {
102            // Drain all available requests from the receiver
103            ready!(this.request_rx.poll_recv_many(cx, this.buf, *this.max_batch_size));
104
105            if !this.buf.is_empty() {
106                let batch = std::mem::take(this.buf);
107                let pool = this.pool.clone();
108                tokio::spawn(async move {
109                    Self::process_batch(&pool, batch).await;
110                });
111                this.buf.reserve(1);
112
113                continue;
114            }
115
116            // No requests available, return Pending to wait for more
117            return Poll::Pending;
118        }
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use crate::test_utils::{testing_pool, MockTransaction};
126    use futures::stream::{FuturesUnordered, StreamExt};
127    use std::time::Duration;
128    use tokio::time::timeout;
129
130    #[tokio::test]
131    async fn test_process_batch() {
132        let pool = testing_pool();
133
134        let mut batch_requests = Vec::new();
135        let mut responses = Vec::new();
136
137        for i in 0..100 {
138            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
139            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
140
141            batch_requests.push(BatchTxRequest::new(tx, response_tx));
142            responses.push(response_rx);
143        }
144
145        BatchTxProcessor::process_batch(&pool, batch_requests).await;
146
147        for response_rx in responses {
148            let result = timeout(Duration::from_millis(5), response_rx)
149                .await
150                .expect("Timeout waiting for response")
151                .expect("Response channel was closed unexpectedly");
152            assert!(result.is_ok());
153        }
154    }
155
156    #[tokio::test]
157    async fn test_batch_processor() {
158        let pool = testing_pool();
159        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
160
161        // Spawn the processor
162        let handle = tokio::spawn(processor);
163
164        let mut responses = Vec::new();
165
166        for i in 0..50 {
167            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
168            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
169
170            request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx");
171            responses.push(response_rx);
172        }
173
174        tokio::time::sleep(Duration::from_millis(10)).await;
175
176        for rx in responses {
177            let result = timeout(Duration::from_millis(10), rx)
178                .await
179                .expect("Timeout waiting for response")
180                .expect("Response channel was closed unexpectedly");
181            assert!(result.is_ok());
182        }
183
184        drop(request_tx);
185        handle.abort();
186    }
187
188    #[tokio::test]
189    async fn test_add_transaction() {
190        let pool = testing_pool();
191        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
192
193        // Spawn the processor
194        let handle = tokio::spawn(processor);
195
196        let mut results = Vec::new();
197        for i in 0..10 {
198            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
199            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
200            let request = BatchTxRequest::new(tx, response_tx);
201            request_tx.send(request).expect("Could not send batch tx");
202            results.push(response_rx);
203        }
204
205        for res in results {
206            let result = timeout(Duration::from_millis(10), res)
207                .await
208                .expect("Timeout waiting for transaction result");
209            assert!(result.is_ok());
210        }
211
212        handle.abort();
213    }
214
215    #[tokio::test]
216    async fn test_max_batch_size() {
217        let pool = testing_pool();
218        let max_batch_size = 10;
219        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), max_batch_size);
220
221        // Spawn batch processor with threshold
222        let handle = tokio::spawn(processor);
223
224        let mut futures = FuturesUnordered::new();
225        for i in 0..max_batch_size {
226            let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
227            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
228            let request = BatchTxRequest::new(tx, response_tx);
229            let request_tx_clone = request_tx.clone();
230
231            let tx_fut = async move {
232                request_tx_clone.send(request).expect("Could not send batch tx");
233                response_rx.await.expect("Could not receive batch response")
234            };
235            futures.push(tx_fut);
236        }
237
238        while let Some(result) = timeout(Duration::from_millis(5), futures.next())
239            .await
240            .expect("Timeout waiting for transaction result")
241        {
242            assert!(result.is_ok());
243        }
244
245        handle.abort();
246    }
247}