reth_transaction_pool/
batcher.rs

1//! Transaction batching for `Pool` insertion for high-throughput scenarios
2//!
3//! This module provides transaction batching logic to reduce lock contention when processing
4//! many concurrent transaction pool insertions.
5
6use crate::{
7    error::PoolError, AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
8};
9use pin_project::pin_project;
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16
17/// A single batch transaction request
18/// All transactions processed through the batcher are considered local
19/// transactions (`TransactionOrigin::Local`) when inserted into the pool.
20#[derive(Debug)]
21pub struct BatchTxRequest<T: PoolTransaction> {
22    /// Tx to be inserted in to the pool
23    pool_tx: T,
24    /// Channel to send result back to caller
25    response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
26}
27
28impl<T> BatchTxRequest<T>
29where
30    T: PoolTransaction,
31{
32    /// Create a new batch transaction request
33    pub const fn new(
34        pool_tx: T,
35        response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
36    ) -> Self {
37        Self { pool_tx, response_tx }
38    }
39}
40
41/// Transaction batch processor that handles batch processing
42#[pin_project]
43#[derive(Debug)]
44pub struct BatchTxProcessor<Pool: TransactionPool> {
45    pool: Pool,
46    max_batch_size: usize,
47    #[pin]
48    request_rx: mpsc::UnboundedReceiver<BatchTxRequest<Pool::Transaction>>,
49}
50
51impl<Pool> BatchTxProcessor<Pool>
52where
53    Pool: TransactionPool + 'static,
54{
55    /// Create a new `BatchTxProcessor`
56    pub fn new(
57        pool: Pool,
58        max_batch_size: usize,
59    ) -> (Self, mpsc::UnboundedSender<BatchTxRequest<Pool::Transaction>>) {
60        let (request_tx, request_rx) = mpsc::unbounded_channel();
61
62        let processor = Self { pool, max_batch_size, request_rx };
63
64        (processor, request_tx)
65    }
66
67    /// Process a batch of transaction requests, grouped by origin
68    async fn process_batch(pool: &Pool, batch: Vec<BatchTxRequest<Pool::Transaction>>) {
69        let (pool_transactions, response_tx): (Vec<_>, Vec<_>) =
70            batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip();
71
72        let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await;
73
74        for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) {
75            let _ = response_tx.send(pool_result);
76        }
77    }
78}
79
80impl<Pool> Future for BatchTxProcessor<Pool>
81where
82    Pool: TransactionPool + 'static,
83{
84    type Output = ();
85
86    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
87        let mut this = self.project();
88
89        loop {
90            // Drain all available requests from the receiver
91            let mut batch = Vec::with_capacity(1);
92            while let Poll::Ready(Some(request)) = this.request_rx.poll_recv(cx) {
93                batch.push(request);
94
95                // Check if the max batch size threshold has been reached
96                if batch.len() >= *this.max_batch_size {
97                    break;
98                }
99            }
100
101            if !batch.is_empty() {
102                let pool = this.pool.clone();
103                tokio::spawn(async move {
104                    Self::process_batch(&pool, batch).await;
105                });
106
107                continue;
108            }
109
110            // No requests available, return Pending to wait for more
111            return Poll::Pending;
112        }
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use crate::test_utils::{testing_pool, MockTransaction};
120    use futures::stream::{FuturesUnordered, StreamExt};
121    use std::time::Duration;
122    use tokio::time::timeout;
123
124    #[tokio::test]
125    async fn test_process_batch() {
126        let pool = testing_pool();
127
128        let mut batch_requests = Vec::new();
129        let mut responses = Vec::new();
130
131        for i in 0..100 {
132            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
133            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
134
135            batch_requests.push(BatchTxRequest::new(tx, response_tx));
136            responses.push(response_rx);
137        }
138
139        BatchTxProcessor::process_batch(&pool, batch_requests).await;
140
141        for response_rx in responses {
142            let result = timeout(Duration::from_millis(5), response_rx)
143                .await
144                .expect("Timeout waiting for response")
145                .expect("Response channel was closed unexpectedly");
146            assert!(result.is_ok());
147        }
148    }
149
150    #[tokio::test]
151    async fn test_batch_processor() {
152        let pool = testing_pool();
153        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
154
155        // Spawn the processor
156        let handle = tokio::spawn(processor);
157
158        let mut responses = Vec::new();
159
160        for i in 0..50 {
161            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
162            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
163
164            request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx");
165            responses.push(response_rx);
166        }
167
168        tokio::time::sleep(Duration::from_millis(10)).await;
169
170        for rx in responses {
171            let result = timeout(Duration::from_millis(10), rx)
172                .await
173                .expect("Timeout waiting for response")
174                .expect("Response channel was closed unexpectedly");
175            assert!(result.is_ok());
176        }
177
178        drop(request_tx);
179        handle.abort();
180    }
181
182    #[tokio::test]
183    async fn test_add_transaction() {
184        let pool = testing_pool();
185        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000);
186
187        // Spawn the processor
188        let handle = tokio::spawn(processor);
189
190        let mut results = Vec::new();
191        for i in 0..10 {
192            let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
193            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
194            let request = BatchTxRequest::new(tx, response_tx);
195            request_tx.send(request).expect("Could not send batch tx");
196            results.push(response_rx);
197        }
198
199        for res in results {
200            let result = timeout(Duration::from_millis(10), res)
201                .await
202                .expect("Timeout waiting for transaction result");
203            assert!(result.is_ok());
204        }
205
206        handle.abort();
207    }
208
209    #[tokio::test]
210    async fn test_max_batch_size() {
211        let pool = testing_pool();
212        let max_batch_size = 10;
213        let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), max_batch_size);
214
215        // Spawn batch processor with threshold
216        let handle = tokio::spawn(processor);
217
218        let mut futures = FuturesUnordered::new();
219        for i in 0..max_batch_size {
220            let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
221            let (response_tx, response_rx) = tokio::sync::oneshot::channel();
222            let request = BatchTxRequest::new(tx, response_tx);
223            let request_tx_clone = request_tx.clone();
224
225            let tx_fut = async move {
226                request_tx_clone.send(request).expect("Could not send batch tx");
227                response_rx.await.expect("Could not receive batch response")
228            };
229            futures.push(tx_fut);
230        }
231
232        while let Some(result) = timeout(Duration::from_millis(5), futures.next())
233            .await
234            .expect("Timeout waiting for transaction result")
235        {
236            assert!(result.is_ok());
237        }
238
239        handle.abort();
240    }
241}