1use core::fmt;
2use std::collections::BTreeMap;
3
4use alloy_consensus::Transaction;
5use alloy_primitives::Address;
6use alloy_rpc_types_txpool::{
7 TxpoolContent, TxpoolContentFrom, TxpoolInspect, TxpoolInspectSummary, TxpoolStatus,
8};
9use async_trait::async_trait;
10use jsonrpsee::core::RpcResult;
11use reth_primitives_traits::NodePrimitives;
12use reth_rpc_api::TxPoolApiServer;
13use reth_rpc_convert::{RpcConvert, RpcTypes};
14use reth_rpc_eth_api::RpcTransaction;
15use reth_transaction_pool::{
16 AllPoolTransactions, PoolConsensusTx, PoolTransaction, TransactionPool,
17};
18use tracing::trace;
19
20#[derive(Clone)]
24pub struct TxPoolApi<Pool, Eth> {
25 pool: Pool,
27 tx_resp_builder: Eth,
28}
29
30impl<Pool, Eth> TxPoolApi<Pool, Eth> {
31 pub const fn new(pool: Pool, tx_resp_builder: Eth) -> Self {
33 Self { pool, tx_resp_builder }
34 }
35}
36
37impl<Pool, Eth> TxPoolApi<Pool, Eth>
38where
39 Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
40 Eth: RpcConvert<Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Pool>>>,
41{
42 fn content(&self) -> Result<TxpoolContent<RpcTransaction<Eth::Network>>, Eth::Error> {
43 #[inline]
44 fn insert<Tx, RpcTxB>(
45 tx: &Tx,
46 content: &mut BTreeMap<
47 Address,
48 BTreeMap<String, <RpcTxB::Network as RpcTypes>::TransactionResponse>,
49 >,
50 resp_builder: &RpcTxB,
51 ) -> Result<(), RpcTxB::Error>
52 where
53 Tx: PoolTransaction,
54 RpcTxB: RpcConvert<Primitives: NodePrimitives<SignedTx = Tx::Consensus>>,
55 {
56 content.entry(tx.sender()).or_default().insert(
57 tx.nonce().to_string(),
58 resp_builder.fill_pending(tx.clone_into_consensus())?,
59 );
60
61 Ok(())
62 }
63
64 let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
65
66 let mut content = TxpoolContent::default();
67 for pending in pending {
68 insert::<_, Eth>(&pending.transaction, &mut content.pending, &self.tx_resp_builder)?;
69 }
70 for queued in queued {
71 insert::<_, Eth>(&queued.transaction, &mut content.queued, &self.tx_resp_builder)?;
72 }
73
74 Ok(content)
75 }
76}
77
78#[async_trait]
79impl<Pool, Eth> TxPoolApiServer<RpcTransaction<Eth::Network>> for TxPoolApi<Pool, Eth>
80where
81 Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
82 Eth: RpcConvert<Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Pool>>> + 'static,
83{
84 async fn txpool_status(&self) -> RpcResult<TxpoolStatus> {
90 trace!(target: "rpc::eth", "Serving txpool_status");
91 let (pending, queued) = self.pool.pending_and_queued_txn_count();
92 Ok(TxpoolStatus { pending: pending as u64, queued: queued as u64 })
93 }
94
95 async fn txpool_inspect(&self) -> RpcResult<TxpoolInspect> {
102 trace!(target: "rpc::eth", "Serving txpool_inspect");
103
104 #[inline]
105 fn insert<T: PoolTransaction<Consensus: Transaction>>(
106 tx: &T,
107 inspect: &mut BTreeMap<Address, BTreeMap<String, TxpoolInspectSummary>>,
108 ) {
109 let entry = inspect.entry(tx.sender()).or_default();
110 let tx = tx.clone_into_consensus();
111 entry.insert(tx.nonce().to_string(), tx.into_inner().into());
112 }
113
114 let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
115
116 Ok(TxpoolInspect {
117 pending: pending.iter().fold(Default::default(), |mut acc, tx| {
118 insert(&tx.transaction, &mut acc);
119 acc
120 }),
121 queued: queued.iter().fold(Default::default(), |mut acc, tx| {
122 insert(&tx.transaction, &mut acc);
123 acc
124 }),
125 })
126 }
127
128 async fn txpool_content_from(
134 &self,
135 from: Address,
136 ) -> RpcResult<TxpoolContentFrom<RpcTransaction<Eth::Network>>> {
137 trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom");
138 Ok(self.content().map_err(Into::into)?.remove_from(&from))
139 }
140
141 async fn txpool_content(&self) -> RpcResult<TxpoolContent<RpcTransaction<Eth::Network>>> {
147 trace!(target: "rpc::eth", "Serving txpool_content");
148 Ok(self.content().map_err(Into::into)?)
149 }
150}
151
152impl<Pool, Eth> fmt::Debug for TxPoolApi<Pool, Eth> {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 f.debug_struct("TxpoolApi").finish_non_exhaustive()
155 }
156}