1use crate::{OpEthApi, OpEthApiError, SequencerClient};
4use alloy_primitives::{Bytes, B256};
5use alloy_rpc_types_eth::TransactionInfo;
6use futures::StreamExt;
7use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
8use reth_chain_state::CanonStateSubscriptions;
9use reth_optimism_primitives::DepositReceipt;
10use reth_primitives_traits::{Recovered, SignedTransaction, SignerRecoverable, WithEncoded};
11use reth_rpc_eth_api::{
12 helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction, SpawnBlocking},
13 try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
14 RpcReceipt, TxInfoMapper,
15};
16use reth_rpc_eth_types::{block::convert_transaction_receipt, EthApiError, TransactionSource};
17use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
18use reth_transaction_pool::{
19 AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
20};
21use std::{
22 fmt::{Debug, Formatter},
23 future::Future,
24 time::Duration,
25};
26use tokio_stream::wrappers::WatchStream;
27
28impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
29where
30 N: RpcNodeCore,
31 OpEthApiError: FromEvmError<N::Evm>,
32 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
33{
34 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
35 self.inner.eth_api.signers()
36 }
37
38 fn send_raw_transaction_sync_timeout(&self) -> Duration {
39 self.inner.eth_api.send_raw_transaction_sync_timeout()
40 }
41
42 async fn send_transaction(
43 &self,
44 tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
45 ) -> Result<B256, Self::Error> {
46 let (tx, recovered) = tx.split();
47
48 self.eth_api().broadcast_raw_transaction(tx.clone());
50
51 let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
52
53 if let Some(client) = self.raw_tx_forwarder().as_ref() {
56 tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to sequencer");
57 let hash = client.forward_raw_transaction(&tx).await.inspect_err(|err| {
58 tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction");
59 })?;
60
61 let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| {
63 tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
64 });
65
66 return Ok(hash)
67 }
68
69 let AddedTransactionOutcome { hash, .. } = self
71 .pool()
72 .add_transaction(TransactionOrigin::Local, pool_transaction)
73 .await
74 .map_err(Self::Error::from_eth_err)?;
75
76 Ok(hash)
77 }
78
79 fn send_raw_transaction_sync(
84 &self,
85 tx: Bytes,
86 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send {
87 let this = self.clone();
88 let timeout_duration = self.send_raw_transaction_sync_timeout();
89 async move {
90 let mut canonical_stream = this.provider().canonical_state_stream();
91 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
92 let mut flashblock_stream = this.pending_block_rx().map(WatchStream::new);
93
94 tokio::time::timeout(timeout_duration, async {
95 loop {
96 tokio::select! {
97 biased;
98 flashblock = async {
100 if let Some(stream) = &mut flashblock_stream {
101 stream.next().await
102 } else {
103 futures::future::pending().await
104 }
105 } => {
106 if let Some(flashblock) = flashblock.flatten() {
107 if let Some(receipt) = flashblock
109 .find_and_convert_transaction_receipt(hash, this.converter())
110 {
111 return receipt;
112 }
113 }
114 }
115 canonical_notification = canonical_stream.next() => {
117 if let Some(notification) = canonical_notification {
118 let chain = notification.committed();
119 if let Some((block, tx, receipt, all_receipts)) =
120 chain.find_transaction_and_receipt_by_hash(hash) &&
121 let Some(receipt) = convert_transaction_receipt(
122 block,
123 all_receipts,
124 tx,
125 receipt,
126 this.converter(),
127 )
128 .transpose()?
129 {
130 return Ok(receipt);
131 }
132 } else {
133 break;
135 }
136 }
137 }
138 }
139 Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
140 hash,
141 duration: timeout_duration,
142 }))
143 })
144 .await
145 .unwrap_or_else(|_elapsed| {
146 Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
147 hash,
148 duration: timeout_duration,
149 }))
150 })
151 }
152 }
153
154 fn transaction_receipt(
159 &self,
160 hash: B256,
161 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
162 {
163 let this = self.clone();
164 async move {
165 let tx_receipt = this.load_transaction_and_receipt(hash).await?;
167
168 if tx_receipt.is_none() {
169 if let Ok(Some(pending_block)) = this.pending_flashblock().await &&
171 let Some(Ok(receipt)) = pending_block
172 .find_and_convert_transaction_receipt(hash, this.converter())
173 {
174 return Ok(Some(receipt));
175 }
176 }
177 let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };
178 self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
179 }
180 }
181}
182
183impl<N, Rpc> LoadTransaction for OpEthApi<N, Rpc>
184where
185 N: RpcNodeCore,
186 OpEthApiError: FromEvmError<N::Evm>,
187 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
188{
189 async fn transaction_by_hash(
190 &self,
191 hash: B256,
192 ) -> Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error> {
193 if let Some((tx, meta)) = self
195 .spawn_blocking_io(move |this| {
196 this.provider()
197 .transaction_by_hash_with_meta(hash)
198 .map_err(Self::Error::from_eth_err)
199 })
200 .await?
201 {
202 let transaction = tx
203 .try_into_recovered_unchecked()
204 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
205
206 return Ok(Some(TransactionSource::Block {
207 transaction,
208 index: meta.index,
209 block_hash: meta.block_hash,
210 block_number: meta.block_number,
211 base_fee: meta.base_fee,
212 }));
213 }
214
215 if let Ok(Some(pending_block)) = self.pending_flashblock().await &&
217 let Some(indexed_tx) = pending_block.block().find_indexed(hash)
218 {
219 let meta = indexed_tx.meta();
220 return Ok(Some(TransactionSource::Block {
221 transaction: indexed_tx.recovered_tx().cloned(),
222 index: meta.index,
223 block_hash: meta.block_hash,
224 block_number: meta.block_number,
225 base_fee: meta.base_fee,
226 }));
227 }
228
229 if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus()) {
231 return Ok(Some(TransactionSource::Pool(tx)));
232 }
233
234 Ok(None)
235 }
236}
237
238impl<N, Rpc> OpEthApi<N, Rpc>
239where
240 N: RpcNodeCore,
241 Rpc: RpcConvert<Primitives = N::Primitives>,
242{
243 pub fn raw_tx_forwarder(&self) -> Option<SequencerClient> {
245 self.inner.sequencer_client.clone()
246 }
247}
248
249pub struct OpTxInfoMapper<Provider> {
254 provider: Provider,
255}
256
257impl<Provider: Clone> Clone for OpTxInfoMapper<Provider> {
258 fn clone(&self) -> Self {
259 Self { provider: self.provider.clone() }
260 }
261}
262
263impl<Provider> Debug for OpTxInfoMapper<Provider> {
264 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
265 f.debug_struct("OpTxInfoMapper").finish()
266 }
267}
268
269impl<Provider> OpTxInfoMapper<Provider> {
270 pub const fn new(provider: Provider) -> Self {
272 Self { provider }
273 }
274}
275
276impl<T, Provider> TxInfoMapper<T> for OpTxInfoMapper<Provider>
277where
278 T: OpTransaction + SignedTransaction,
279 Provider: ReceiptProvider<Receipt: DepositReceipt>,
280{
281 type Out = OpTransactionInfo;
282 type Err = ProviderError;
283
284 fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
285 try_into_op_tx_info(&self.provider, tx, tx_info)
286 }
287}