1use crate::{OpEthApi, OpEthApiError, SequencerClient};
4use alloy_primitives::{Bytes, B256};
5use alloy_rpc_types_eth::TransactionInfo;
6use futures::StreamExt;
7use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
8use reth_chain_state::CanonStateSubscriptions;
9use reth_optimism_primitives::DepositReceipt;
10use reth_primitives_traits::{
11 BlockBody, Recovered, SignedTransaction, SignerRecoverable, WithEncoded,
12};
13use reth_rpc_eth_api::{
14 helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction, SpawnBlocking},
15 try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
16 RpcReceipt, TxInfoMapper,
17};
18use reth_rpc_eth_types::{EthApiError, TransactionSource};
19use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
20use reth_transaction_pool::{
21 AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
22};
23use std::{
24 fmt::{Debug, Formatter},
25 future::Future,
26 time::Duration,
27};
28use tokio_stream::wrappers::WatchStream;
29
30impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
31where
32 N: RpcNodeCore,
33 OpEthApiError: FromEvmError<N::Evm>,
34 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
35{
36 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
37 self.inner.eth_api.signers()
38 }
39
40 fn send_raw_transaction_sync_timeout(&self) -> Duration {
41 self.inner.eth_api.send_raw_transaction_sync_timeout()
42 }
43
44 async fn send_transaction(
45 &self,
46 tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
47 ) -> Result<B256, Self::Error> {
48 let (tx, recovered) = tx.split();
49
50 self.eth_api().broadcast_raw_transaction(tx.clone());
52
53 let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
54
55 if let Some(client) = self.raw_tx_forwarder().as_ref() {
58 tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to sequencer");
59 let hash = client.forward_raw_transaction(&tx).await.inspect_err(|err| {
60 tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction");
61 })?;
62
63 let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| {
65 tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
66 });
67
68 return Ok(hash)
69 }
70
71 let AddedTransactionOutcome { hash, .. } = self
73 .pool()
74 .add_transaction(TransactionOrigin::Local, pool_transaction)
75 .await
76 .map_err(Self::Error::from_eth_err)?;
77
78 Ok(hash)
79 }
80
81 fn send_raw_transaction_sync(
86 &self,
87 tx: Bytes,
88 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send {
89 let this = self.clone();
90 let timeout_duration = self.send_raw_transaction_sync_timeout();
91 async move {
92 let mut canonical_stream = this.provider().canonical_state_stream();
93 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
94 let mut flashblock_stream = this.pending_block_rx().map(WatchStream::new);
95
96 tokio::time::timeout(timeout_duration, async {
97 loop {
98 tokio::select! {
99 biased;
100 flashblock = async {
102 if let Some(stream) = &mut flashblock_stream {
103 stream.next().await
104 } else {
105 futures::future::pending().await
106 }
107 } => {
108 if let Some(flashblock) = flashblock.flatten() {
109 if let Some(receipt) = flashblock
111 .find_and_convert_transaction_receipt(hash, this.converter())
112 {
113 return receipt;
114 }
115 }
116 }
117 canonical_notification = canonical_stream.next() => {
119 if let Some(notification) = canonical_notification {
120 let chain = notification.committed();
121 for block in chain.blocks_iter() {
122 if block.body().contains_transaction(&hash)
123 && let Some(receipt) = this.transaction_receipt(hash).await? {
124 return Ok(receipt);
125 }
126 }
127 } else {
128 break;
130 }
131 }
132 }
133 }
134 Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
135 hash,
136 duration: timeout_duration,
137 }))
138 })
139 .await
140 .unwrap_or_else(|_elapsed| {
141 Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
142 hash,
143 duration: timeout_duration,
144 }))
145 })
146 }
147 }
148
149 fn transaction_receipt(
154 &self,
155 hash: B256,
156 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
157 {
158 let this = self.clone();
159 async move {
160 let tx_receipt = this.load_transaction_and_receipt(hash).await?;
162
163 if tx_receipt.is_none() {
164 if let Ok(Some(pending_block)) = this.pending_flashblock().await &&
166 let Some(Ok(receipt)) = pending_block
167 .find_and_convert_transaction_receipt(hash, this.converter())
168 {
169 return Ok(Some(receipt));
170 }
171 }
172 let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };
173 self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
174 }
175 }
176}
177
178impl<N, Rpc> LoadTransaction for OpEthApi<N, Rpc>
179where
180 N: RpcNodeCore,
181 OpEthApiError: FromEvmError<N::Evm>,
182 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
183{
184 async fn transaction_by_hash(
185 &self,
186 hash: B256,
187 ) -> Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error> {
188 if let Some((tx, meta)) = self
190 .spawn_blocking_io(move |this| {
191 this.provider()
192 .transaction_by_hash_with_meta(hash)
193 .map_err(Self::Error::from_eth_err)
194 })
195 .await?
196 {
197 let transaction = tx
198 .try_into_recovered_unchecked()
199 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
200
201 return Ok(Some(TransactionSource::Block {
202 transaction,
203 index: meta.index,
204 block_hash: meta.block_hash,
205 block_number: meta.block_number,
206 base_fee: meta.base_fee,
207 }));
208 }
209
210 if let Ok(Some(pending_block)) = self.pending_flashblock().await &&
212 let Some(indexed_tx) = pending_block.block().find_indexed(hash)
213 {
214 let meta = indexed_tx.meta();
215 return Ok(Some(TransactionSource::Block {
216 transaction: indexed_tx.recovered_tx().cloned(),
217 index: meta.index,
218 block_hash: meta.block_hash,
219 block_number: meta.block_number,
220 base_fee: meta.base_fee,
221 }));
222 }
223
224 if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus()) {
226 return Ok(Some(TransactionSource::Pool(tx)));
227 }
228
229 Ok(None)
230 }
231}
232
233impl<N, Rpc> OpEthApi<N, Rpc>
234where
235 N: RpcNodeCore,
236 Rpc: RpcConvert<Primitives = N::Primitives>,
237{
238 pub fn raw_tx_forwarder(&self) -> Option<SequencerClient> {
240 self.inner.sequencer_client.clone()
241 }
242}
243
244pub struct OpTxInfoMapper<Provider> {
249 provider: Provider,
250}
251
252impl<Provider: Clone> Clone for OpTxInfoMapper<Provider> {
253 fn clone(&self) -> Self {
254 Self { provider: self.provider.clone() }
255 }
256}
257
258impl<Provider> Debug for OpTxInfoMapper<Provider> {
259 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
260 f.debug_struct("OpTxInfoMapper").finish()
261 }
262}
263
264impl<Provider> OpTxInfoMapper<Provider> {
265 pub const fn new(provider: Provider) -> Self {
267 Self { provider }
268 }
269}
270
271impl<T, Provider> TxInfoMapper<T> for OpTxInfoMapper<Provider>
272where
273 T: OpTransaction + SignedTransaction,
274 Provider: ReceiptProvider<Receipt: DepositReceipt>,
275{
276 type Out = OpTransactionInfo;
277 type Err = ProviderError;
278
279 fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
280 try_into_op_tx_info(&self.provider, tx, tx_info)
281 }
282}