reth_rpc/
txpool.rs

1use core::fmt;
2use std::collections::BTreeMap;
3
4use alloy_consensus::Transaction;
5use alloy_primitives::Address;
6use alloy_rpc_types_txpool::{
7    TxpoolContent, TxpoolContentFrom, TxpoolInspect, TxpoolInspectSummary, TxpoolStatus,
8};
9use async_trait::async_trait;
10use jsonrpsee::core::RpcResult;
11use reth_rpc_api::TxPoolApiServer;
12use reth_rpc_types_compat::TransactionCompat;
13use reth_transaction_pool::{
14    AllPoolTransactions, PoolConsensusTx, PoolTransaction, TransactionPool,
15};
16use tracing::trace;
17
18/// `txpool` API implementation.
19///
20/// This type provides the functionality for handling `txpool` related requests.
21#[derive(Clone)]
22pub struct TxPoolApi<Pool, Eth> {
23    /// An interface to interact with the pool
24    pool: Pool,
25    tx_resp_builder: Eth,
26}
27
28impl<Pool, Eth> TxPoolApi<Pool, Eth> {
29    /// Creates a new instance of `TxpoolApi`.
30    pub const fn new(pool: Pool, tx_resp_builder: Eth) -> Self {
31        Self { pool, tx_resp_builder }
32    }
33}
34
35impl<Pool, Eth> TxPoolApi<Pool, Eth>
36where
37    Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
38    Eth: TransactionCompat<PoolConsensusTx<Pool>>,
39{
40    fn content(&self) -> Result<TxpoolContent<Eth::Transaction>, Eth::Error> {
41        #[inline]
42        fn insert<Tx, RpcTxB>(
43            tx: &Tx,
44            content: &mut BTreeMap<Address, BTreeMap<String, RpcTxB::Transaction>>,
45            resp_builder: &RpcTxB,
46        ) -> Result<(), RpcTxB::Error>
47        where
48            Tx: PoolTransaction,
49            RpcTxB: TransactionCompat<Tx::Consensus>,
50        {
51            content.entry(tx.sender()).or_default().insert(
52                tx.nonce().to_string(),
53                resp_builder.fill_pending(tx.clone_into_consensus())?,
54            );
55
56            Ok(())
57        }
58
59        let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
60
61        let mut content = TxpoolContent { pending: BTreeMap::new(), queued: BTreeMap::new() };
62        for pending in pending {
63            insert::<_, Eth>(&pending.transaction, &mut content.pending, &self.tx_resp_builder)?;
64        }
65        for queued in queued {
66            insert::<_, Eth>(&queued.transaction, &mut content.queued, &self.tx_resp_builder)?;
67        }
68
69        Ok(content)
70    }
71}
72
73#[async_trait]
74impl<Pool, Eth> TxPoolApiServer<Eth::Transaction> for TxPoolApi<Pool, Eth>
75where
76    Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
77    Eth: TransactionCompat<PoolConsensusTx<Pool>> + 'static,
78{
79    /// Returns the number of transactions currently pending for inclusion in the next block(s), as
80    /// well as the ones that are being scheduled for future execution only.
81    /// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status)
82    ///
83    /// Handler for `txpool_status`
84    async fn txpool_status(&self) -> RpcResult<TxpoolStatus> {
85        trace!(target: "rpc::eth", "Serving txpool_status");
86        let all = self.pool.all_transactions();
87        Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 })
88    }
89
90    /// Returns a summary of all the transactions currently pending for inclusion in the next
91    /// block(s), as well as the ones that are being scheduled for future execution only.
92    ///
93    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect) for more details
94    ///
95    /// Handler for `txpool_inspect`
96    async fn txpool_inspect(&self) -> RpcResult<TxpoolInspect> {
97        trace!(target: "rpc::eth", "Serving txpool_inspect");
98
99        #[inline]
100        fn insert<T: PoolTransaction<Consensus: Transaction>>(
101            tx: &T,
102            inspect: &mut BTreeMap<Address, BTreeMap<String, TxpoolInspectSummary>>,
103        ) {
104            let entry = inspect.entry(tx.sender()).or_default();
105            let tx = tx.clone_into_consensus();
106            entry.insert(tx.nonce().to_string(), tx.into_inner().into());
107        }
108
109        let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
110
111        Ok(TxpoolInspect {
112            pending: pending.iter().fold(Default::default(), |mut acc, tx| {
113                insert(&tx.transaction, &mut acc);
114                acc
115            }),
116            queued: queued.iter().fold(Default::default(), |mut acc, tx| {
117                insert(&tx.transaction, &mut acc);
118                acc
119            }),
120        })
121    }
122
123    /// Retrieves the transactions contained within the txpool, returning pending as well as queued
124    /// transactions of this address, grouped by nonce.
125    ///
126    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_contentFrom) for more details
127    /// Handler for `txpool_contentFrom`
128    async fn txpool_content_from(
129        &self,
130        from: Address,
131    ) -> RpcResult<TxpoolContentFrom<Eth::Transaction>> {
132        trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom");
133        Ok(self.content().map_err(Into::into)?.remove_from(&from))
134    }
135
136    /// Returns the details of all transactions currently pending for inclusion in the next
137    /// block(s), as well as the ones that are being scheduled for future execution only.
138    ///
139    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content) for more details
140    /// Handler for `txpool_content`
141    async fn txpool_content(&self) -> RpcResult<TxpoolContent<Eth::Transaction>> {
142        trace!(target: "rpc::eth", "Serving txpool_content");
143        Ok(self.content().map_err(Into::into)?)
144    }
145}
146
147impl<Pool, Eth> fmt::Debug for TxPoolApi<Pool, Eth> {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        f.debug_struct("TxpoolApi").finish_non_exhaustive()
150    }
151}