reth_rpc/
txpool.rs

1use core::fmt;
2use std::collections::BTreeMap;
3
4use alloy_consensus::Transaction;
5use alloy_primitives::Address;
6use alloy_rpc_types_txpool::{
7    TxpoolContent, TxpoolContentFrom, TxpoolInspect, TxpoolInspectSummary, TxpoolStatus,
8};
9use async_trait::async_trait;
10use jsonrpsee::core::RpcResult;
11use reth_primitives_traits::NodePrimitives;
12use reth_rpc_api::TxPoolApiServer;
13use reth_rpc_convert::{RpcConvert, RpcTypes};
14use reth_rpc_eth_api::RpcTransaction;
15use reth_transaction_pool::{
16    AllPoolTransactions, PoolConsensusTx, PoolTransaction, TransactionPool,
17};
18use tracing::trace;
19
20/// `txpool` API implementation.
21///
22/// This type provides the functionality for handling `txpool` related requests.
23#[derive(Clone)]
24pub struct TxPoolApi<Pool, Eth> {
25    /// An interface to interact with the pool
26    pool: Pool,
27    tx_resp_builder: Eth,
28}
29
30impl<Pool, Eth> TxPoolApi<Pool, Eth> {
31    /// Creates a new instance of `TxpoolApi`.
32    pub const fn new(pool: Pool, tx_resp_builder: Eth) -> Self {
33        Self { pool, tx_resp_builder }
34    }
35}
36
37impl<Pool, Eth> TxPoolApi<Pool, Eth>
38where
39    Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
40    Eth: RpcConvert<Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Pool>>>,
41{
42    fn content(&self) -> Result<TxpoolContent<RpcTransaction<Eth::Network>>, Eth::Error> {
43        #[inline]
44        fn insert<Tx, RpcTxB>(
45            tx: &Tx,
46            content: &mut BTreeMap<
47                Address,
48                BTreeMap<String, <RpcTxB::Network as RpcTypes>::TransactionResponse>,
49            >,
50            resp_builder: &RpcTxB,
51        ) -> Result<(), RpcTxB::Error>
52        where
53            Tx: PoolTransaction,
54            RpcTxB: RpcConvert<Primitives: NodePrimitives<SignedTx = Tx::Consensus>>,
55        {
56            content.entry(tx.sender()).or_default().insert(
57                tx.nonce().to_string(),
58                resp_builder.fill_pending(tx.clone_into_consensus())?,
59            );
60
61            Ok(())
62        }
63
64        let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
65
66        let mut content = TxpoolContent::default();
67        for pending in pending {
68            insert::<_, Eth>(&pending.transaction, &mut content.pending, &self.tx_resp_builder)?;
69        }
70        for queued in queued {
71            insert::<_, Eth>(&queued.transaction, &mut content.queued, &self.tx_resp_builder)?;
72        }
73
74        Ok(content)
75    }
76}
77
78#[async_trait]
79impl<Pool, Eth> TxPoolApiServer<RpcTransaction<Eth::Network>> for TxPoolApi<Pool, Eth>
80where
81    Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
82    Eth: RpcConvert<Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Pool>>> + 'static,
83{
84    /// Returns the number of transactions currently pending for inclusion in the next block(s), as
85    /// well as the ones that are being scheduled for future execution only.
86    /// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status)
87    ///
88    /// Handler for `txpool_status`
89    async fn txpool_status(&self) -> RpcResult<TxpoolStatus> {
90        trace!(target: "rpc::eth", "Serving txpool_status");
91        let (pending, queued) = self.pool.pending_and_queued_txn_count();
92        Ok(TxpoolStatus { pending: pending as u64, queued: queued as u64 })
93    }
94
95    /// Returns a summary of all the transactions currently pending for inclusion in the next
96    /// block(s), as well as the ones that are being scheduled for future execution only.
97    ///
98    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect) for more details
99    ///
100    /// Handler for `txpool_inspect`
101    async fn txpool_inspect(&self) -> RpcResult<TxpoolInspect> {
102        trace!(target: "rpc::eth", "Serving txpool_inspect");
103
104        #[inline]
105        fn insert<T: PoolTransaction<Consensus: Transaction>>(
106            tx: &T,
107            inspect: &mut BTreeMap<Address, BTreeMap<String, TxpoolInspectSummary>>,
108        ) {
109            let entry = inspect.entry(tx.sender()).or_default();
110            let tx = tx.clone_into_consensus();
111            entry.insert(tx.nonce().to_string(), tx.into_inner().into());
112        }
113
114        let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
115
116        Ok(TxpoolInspect {
117            pending: pending.iter().fold(Default::default(), |mut acc, tx| {
118                insert(&tx.transaction, &mut acc);
119                acc
120            }),
121            queued: queued.iter().fold(Default::default(), |mut acc, tx| {
122                insert(&tx.transaction, &mut acc);
123                acc
124            }),
125        })
126    }
127
128    /// Retrieves the transactions contained within the txpool, returning pending as well as queued
129    /// transactions of this address, grouped by nonce.
130    ///
131    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_contentFrom) for more details
132    /// Handler for `txpool_contentFrom`
133    async fn txpool_content_from(
134        &self,
135        from: Address,
136    ) -> RpcResult<TxpoolContentFrom<RpcTransaction<Eth::Network>>> {
137        trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom");
138        Ok(self.content().map_err(Into::into)?.remove_from(&from))
139    }
140
141    /// Returns the details of all transactions currently pending for inclusion in the next
142    /// block(s), as well as the ones that are being scheduled for future execution only.
143    ///
144    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content) for more details
145    /// Handler for `txpool_content`
146    async fn txpool_content(&self) -> RpcResult<TxpoolContent<RpcTransaction<Eth::Network>>> {
147        trace!(target: "rpc::eth", "Serving txpool_content");
148        Ok(self.content().map_err(Into::into)?)
149    }
150}
151
152impl<Pool, Eth> fmt::Debug for TxPoolApi<Pool, Eth> {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        f.debug_struct("TxpoolApi").finish_non_exhaustive()
155    }
156}