reth_rpc/
txpool.rs
1use core::fmt;
2use std::collections::BTreeMap;
3
4use alloy_consensus::Transaction;
5use alloy_primitives::Address;
6use alloy_rpc_types_txpool::{
7 TxpoolContent, TxpoolContentFrom, TxpoolInspect, TxpoolInspectSummary, TxpoolStatus,
8};
9use async_trait::async_trait;
10use jsonrpsee::core::RpcResult;
11use reth_rpc_api::TxPoolApiServer;
12use reth_rpc_types_compat::TransactionCompat;
13use reth_transaction_pool::{
14 AllPoolTransactions, PoolConsensusTx, PoolTransaction, TransactionPool,
15};
16use tracing::trace;
17
18#[derive(Clone)]
22pub struct TxPoolApi<Pool, Eth> {
23 pool: Pool,
25 tx_resp_builder: Eth,
26}
27
28impl<Pool, Eth> TxPoolApi<Pool, Eth> {
29 pub const fn new(pool: Pool, tx_resp_builder: Eth) -> Self {
31 Self { pool, tx_resp_builder }
32 }
33}
34
35impl<Pool, Eth> TxPoolApi<Pool, Eth>
36where
37 Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
38 Eth: TransactionCompat<PoolConsensusTx<Pool>>,
39{
40 fn content(&self) -> Result<TxpoolContent<Eth::Transaction>, Eth::Error> {
41 #[inline]
42 fn insert<Tx, RpcTxB>(
43 tx: &Tx,
44 content: &mut BTreeMap<Address, BTreeMap<String, RpcTxB::Transaction>>,
45 resp_builder: &RpcTxB,
46 ) -> Result<(), RpcTxB::Error>
47 where
48 Tx: PoolTransaction,
49 RpcTxB: TransactionCompat<Tx::Consensus>,
50 {
51 content.entry(tx.sender()).or_default().insert(
52 tx.nonce().to_string(),
53 resp_builder.fill_pending(tx.clone_into_consensus())?,
54 );
55
56 Ok(())
57 }
58
59 let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
60
61 let mut content = TxpoolContent { pending: BTreeMap::new(), queued: BTreeMap::new() };
62 for pending in pending {
63 insert::<_, Eth>(&pending.transaction, &mut content.pending, &self.tx_resp_builder)?;
64 }
65 for queued in queued {
66 insert::<_, Eth>(&queued.transaction, &mut content.queued, &self.tx_resp_builder)?;
67 }
68
69 Ok(content)
70 }
71}
72
73#[async_trait]
74impl<Pool, Eth> TxPoolApiServer<Eth::Transaction> for TxPoolApi<Pool, Eth>
75where
76 Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
77 Eth: TransactionCompat<PoolConsensusTx<Pool>> + 'static,
78{
79 async fn txpool_status(&self) -> RpcResult<TxpoolStatus> {
85 trace!(target: "rpc::eth", "Serving txpool_status");
86 let all = self.pool.all_transactions();
87 Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 })
88 }
89
90 async fn txpool_inspect(&self) -> RpcResult<TxpoolInspect> {
97 trace!(target: "rpc::eth", "Serving txpool_inspect");
98
99 #[inline]
100 fn insert<T: PoolTransaction<Consensus: Transaction>>(
101 tx: &T,
102 inspect: &mut BTreeMap<Address, BTreeMap<String, TxpoolInspectSummary>>,
103 ) {
104 let entry = inspect.entry(tx.sender()).or_default();
105 let tx = tx.clone_into_consensus();
106 entry.insert(tx.nonce().to_string(), tx.into_inner().into());
107 }
108
109 let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
110
111 Ok(TxpoolInspect {
112 pending: pending.iter().fold(Default::default(), |mut acc, tx| {
113 insert(&tx.transaction, &mut acc);
114 acc
115 }),
116 queued: queued.iter().fold(Default::default(), |mut acc, tx| {
117 insert(&tx.transaction, &mut acc);
118 acc
119 }),
120 })
121 }
122
123 async fn txpool_content_from(
129 &self,
130 from: Address,
131 ) -> RpcResult<TxpoolContentFrom<Eth::Transaction>> {
132 trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom");
133 Ok(self.content().map_err(Into::into)?.remove_from(&from))
134 }
135
136 async fn txpool_content(&self) -> RpcResult<TxpoolContent<Eth::Transaction>> {
142 trace!(target: "rpc::eth", "Serving txpool_content");
143 Ok(self.content().map_err(Into::into)?)
144 }
145}
146
147impl<Pool, Eth> fmt::Debug for TxPoolApi<Pool, Eth> {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_struct("TxpoolApi").finish_non_exhaustive()
150 }
151}