1use crate::{OpEthApi, OpEthApiError, SequencerClient};
4use alloy_consensus::TxReceipt as _;
5use alloy_primitives::{Bytes, B256};
6use alloy_rpc_types_eth::TransactionInfo;
7use futures::StreamExt;
8use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
9use reth_chain_state::CanonStateSubscriptions;
10use reth_optimism_primitives::DepositReceipt;
11use reth_primitives_traits::{BlockBody, SignedTransaction, SignerRecoverable};
12use reth_rpc_convert::transaction::ConvertReceiptInput;
13use reth_rpc_eth_api::{
14 helpers::{
15 receipt::calculate_gas_used_and_next_log_index, spec::SignersForRpc, EthTransactions,
16 LoadReceipt, LoadTransaction,
17 },
18 try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
19 RpcReceipt, TxInfoMapper,
20};
21use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
22use reth_storage_api::{errors::ProviderError, ReceiptProvider};
23use reth_transaction_pool::{
24 AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
25};
26use std::{
27 fmt::{Debug, Formatter},
28 future::Future,
29 time::Duration,
30};
31use tokio_stream::wrappers::WatchStream;
32
33impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
34where
35 N: RpcNodeCore,
36 OpEthApiError: FromEvmError<N::Evm>,
37 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
38{
39 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
40 self.inner.eth_api.signers()
41 }
42
43 fn send_raw_transaction_sync_timeout(&self) -> Duration {
44 self.inner.eth_api.send_raw_transaction_sync_timeout()
45 }
46
47 async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> {
51 let recovered = recover_raw_transaction(&tx)?;
52
53 self.eth_api().broadcast_raw_transaction(tx.clone());
55
56 let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
57
58 if let Some(client) = self.raw_tx_forwarder().as_ref() {
61 tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to sequencer");
62 let hash = client.forward_raw_transaction(&tx).await.inspect_err(|err| {
63 tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction");
64 })?;
65
66 let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| {
68 tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
69 });
70
71 return Ok(hash)
72 }
73
74 let AddedTransactionOutcome { hash, .. } = self
76 .pool()
77 .add_transaction(TransactionOrigin::Local, pool_transaction)
78 .await
79 .map_err(Self::Error::from_eth_err)?;
80
81 Ok(hash)
82 }
83
84 fn send_raw_transaction_sync(
89 &self,
90 tx: Bytes,
91 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
92 where
93 Self: LoadReceipt + 'static,
94 {
95 let this = self.clone();
96 let timeout_duration = self.send_raw_transaction_sync_timeout();
97 async move {
98 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
99 let mut canonical_stream = this.provider().canonical_state_stream();
100 let flashblock_rx = this.pending_block_rx();
101 let mut flashblock_stream = flashblock_rx.map(WatchStream::new);
102
103 tokio::time::timeout(timeout_duration, async {
104 loop {
105 tokio::select! {
106 canonical_notification = canonical_stream.next() => {
108 if let Some(notification) = canonical_notification {
109 let chain = notification.committed();
110 for block in chain.blocks_iter() {
111 if block.body().contains_transaction(&hash)
112 && let Some(receipt) = this.transaction_receipt(hash).await? {
113 return Ok(receipt);
114 }
115 }
116 } else {
117 break;
119 }
120 }
121 _flashblock_update = async {
123 if let Some(ref mut stream) = flashblock_stream {
124 stream.next().await
125 } else {
126 futures::future::pending().await
127 }
128 } => {
129 if let Ok(Some(pending_block)) = this.pending_flashblock().await {
131 let block_and_receipts = pending_block.into_block_and_receipts();
132 if block_and_receipts.block.body().contains_transaction(&hash)
133 && let Some(receipt) = this.transaction_receipt(hash).await? {
134 return Ok(receipt);
135 }
136 }
137 }
138 }
139 }
140 Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
141 hash,
142 duration: timeout_duration,
143 }))
144 })
145 .await
146 .unwrap_or_else(|_elapsed| {
147 Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
148 hash,
149 duration: timeout_duration,
150 }))
151 })
152 }
153 }
154
155 fn transaction_receipt(
160 &self,
161 hash: B256,
162 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
163 {
164 let this = self.clone();
165 async move {
166 let tx_receipt = this.load_transaction_and_receipt(hash).await?;
168
169 if tx_receipt.is_none() {
170 if let Ok(Some(pending_block)) = this.pending_flashblock().await {
172 let block_and_receipts = pending_block.into_block_and_receipts();
173 if let Some((tx, receipt)) =
174 block_and_receipts.find_transaction_and_receipt_by_hash(hash)
175 {
176 let meta = tx.meta();
181 let all_receipts = &block_and_receipts.receipts;
182
183 let (gas_used, next_log_index) =
184 calculate_gas_used_and_next_log_index(meta.index, all_receipts);
185
186 return Ok(Some(
187 this.tx_resp_builder()
188 .convert_receipts_with_block(
189 vec![ConvertReceiptInput {
190 tx: tx
191 .tx()
192 .clone()
193 .try_into_recovered_unchecked()
194 .map_err(Self::Error::from_eth_err)?
195 .as_recovered_ref(),
196 gas_used: receipt.cumulative_gas_used() - gas_used,
197 receipt: receipt.clone(),
198 next_log_index,
199 meta,
200 }],
201 block_and_receipts.sealed_block(),
202 )?
203 .pop()
204 .unwrap(),
205 ))
206 }
207 }
208 }
209 let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };
210 self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
211 }
212 }
213}
214
215impl<N, Rpc> LoadTransaction for OpEthApi<N, Rpc>
216where
217 N: RpcNodeCore,
218 OpEthApiError: FromEvmError<N::Evm>,
219 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
220{
221}
222
223impl<N, Rpc> OpEthApi<N, Rpc>
224where
225 N: RpcNodeCore,
226 Rpc: RpcConvert<Primitives = N::Primitives>,
227{
228 pub fn raw_tx_forwarder(&self) -> Option<SequencerClient> {
230 self.inner.sequencer_client.clone()
231 }
232}
233
234pub struct OpTxInfoMapper<Provider> {
239 provider: Provider,
240}
241
242impl<Provider: Clone> Clone for OpTxInfoMapper<Provider> {
243 fn clone(&self) -> Self {
244 Self { provider: self.provider.clone() }
245 }
246}
247
248impl<Provider> Debug for OpTxInfoMapper<Provider> {
249 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
250 f.debug_struct("OpTxInfoMapper").finish()
251 }
252}
253
254impl<Provider> OpTxInfoMapper<Provider> {
255 pub const fn new(provider: Provider) -> Self {
257 Self { provider }
258 }
259}
260
261impl<T, Provider> TxInfoMapper<T> for OpTxInfoMapper<Provider>
262where
263 T: OpTransaction + SignedTransaction,
264 Provider: ReceiptProvider<Receipt: DepositReceipt>,
265{
266 type Out = OpTransactionInfo;
267 type Err = ProviderError;
268
269 fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
270 try_into_op_tx_info(&self.provider, tx, tx_info)
271 }
272}