reth_optimism_rpc/eth/
transaction.rs

1//! Loads and formats OP transaction RPC response.
2
3use crate::{OpEthApi, OpEthApiError, SequencerClient};
4use alloy_primitives::{Bytes, B256};
5use alloy_rpc_types_eth::TransactionInfo;
6use futures::StreamExt;
7use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
8use reth_chain_state::CanonStateSubscriptions;
9use reth_optimism_primitives::DepositReceipt;
10use reth_primitives_traits::{Recovered, SignedTransaction, SignerRecoverable, WithEncoded};
11use reth_rpc_eth_api::{
12    helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction, SpawnBlocking},
13    try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
14    RpcReceipt, TxInfoMapper,
15};
16use reth_rpc_eth_types::{block::convert_transaction_receipt, EthApiError, TransactionSource};
17use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
18use reth_transaction_pool::{
19    AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
20};
21use std::{
22    fmt::{Debug, Formatter},
23    future::Future,
24    time::Duration,
25};
26use tokio_stream::wrappers::WatchStream;
27
28impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
29where
30    N: RpcNodeCore,
31    OpEthApiError: FromEvmError<N::Evm>,
32    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
33{
34    fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
35        self.inner.eth_api.signers()
36    }
37
38    fn send_raw_transaction_sync_timeout(&self) -> Duration {
39        self.inner.eth_api.send_raw_transaction_sync_timeout()
40    }
41
42    async fn send_transaction(
43        &self,
44        tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
45    ) -> Result<B256, Self::Error> {
46        let (tx, recovered) = tx.split();
47
48        // broadcast raw transaction to subscribers if there is any.
49        self.eth_api().broadcast_raw_transaction(tx.clone());
50
51        let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
52
53        // On optimism, transactions are forwarded directly to the sequencer to be included in
54        // blocks that it builds.
55        if let Some(client) = self.raw_tx_forwarder().as_ref() {
56            tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to sequencer");
57            let hash = client.forward_raw_transaction(&tx).await.inspect_err(|err| {
58                    tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction");
59                })?;
60
61            // Retain tx in local tx pool after forwarding, for local RPC usage.
62            let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| {
63                tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
64            });
65
66            return Ok(hash)
67        }
68
69        // submit the transaction to the pool with a `Local` origin
70        let AddedTransactionOutcome { hash, .. } = self
71            .pool()
72            .add_transaction(TransactionOrigin::Local, pool_transaction)
73            .await
74            .map_err(Self::Error::from_eth_err)?;
75
76        Ok(hash)
77    }
78
79    /// Decodes and recovers the transaction and submits it to the pool.
80    ///
81    /// And awaits the receipt, checking both canonical blocks and flashblocks for faster
82    /// confirmation.
83    fn send_raw_transaction_sync(
84        &self,
85        tx: Bytes,
86    ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send {
87        let this = self.clone();
88        let timeout_duration = self.send_raw_transaction_sync_timeout();
89        async move {
90            let mut canonical_stream = this.provider().canonical_state_stream();
91            let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
92            let mut flashblock_stream = this.pending_block_rx().map(WatchStream::new);
93
94            tokio::time::timeout(timeout_duration, async {
95                loop {
96                    tokio::select! {
97                        biased;
98                        // check if the tx was preconfirmed in a new flashblock
99                        flashblock = async {
100                            if let Some(stream) = &mut flashblock_stream {
101                                stream.next().await
102                            } else {
103                                futures::future::pending().await
104                            }
105                        } => {
106                            if let Some(flashblock) = flashblock.flatten() {
107                                // if flashblocks are supported, attempt to find id from the pending block
108                                if let Some(receipt) = flashblock
109                                .find_and_convert_transaction_receipt(hash, this.converter())
110                                {
111                                    return receipt;
112                                }
113                            }
114                        }
115                        // Listen for regular canonical block updates for inclusion
116                        canonical_notification = canonical_stream.next() => {
117                            if let Some(notification) = canonical_notification {
118                                let chain = notification.committed();
119                                if let Some((block, tx, receipt, all_receipts)) =
120                                    chain.find_transaction_and_receipt_by_hash(hash) &&
121                                    let Some(receipt) = convert_transaction_receipt(
122                                        block,
123                                        all_receipts,
124                                        tx,
125                                        receipt,
126                                        this.converter(),
127                                    )
128                                    .transpose()?
129                                {
130                                    return Ok(receipt);
131                                }
132                            } else {
133                                // Canonical stream ended
134                                break;
135                            }
136                        }
137                    }
138                }
139                Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
140                    hash,
141                    duration: timeout_duration,
142                }))
143            })
144            .await
145            .unwrap_or_else(|_elapsed| {
146                Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
147                    hash,
148                    duration: timeout_duration,
149                }))
150            })
151        }
152    }
153
154    /// Returns the transaction receipt for the given hash.
155    ///
156    /// With flashblocks, we should also lookup the pending block for the transaction
157    /// because this is considered confirmed/mined.
158    fn transaction_receipt(
159        &self,
160        hash: B256,
161    ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
162    {
163        let this = self.clone();
164        async move {
165            // first attempt to fetch the mined transaction receipt data
166            let tx_receipt = this.load_transaction_and_receipt(hash).await?;
167
168            if tx_receipt.is_none() {
169                // if flashblocks are supported, attempt to find id from the pending block
170                if let Ok(Some(pending_block)) = this.pending_flashblock().await &&
171                    let Some(Ok(receipt)) = pending_block
172                        .find_and_convert_transaction_receipt(hash, this.converter())
173                {
174                    return Ok(Some(receipt));
175                }
176            }
177            let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };
178            self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
179        }
180    }
181}
182
183impl<N, Rpc> LoadTransaction for OpEthApi<N, Rpc>
184where
185    N: RpcNodeCore,
186    OpEthApiError: FromEvmError<N::Evm>,
187    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
188{
189    async fn transaction_by_hash(
190        &self,
191        hash: B256,
192    ) -> Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error> {
193        // 1. Try to find the transaction on disk (historical blocks)
194        if let Some((tx, meta)) = self
195            .spawn_blocking_io(move |this| {
196                this.provider()
197                    .transaction_by_hash_with_meta(hash)
198                    .map_err(Self::Error::from_eth_err)
199            })
200            .await?
201        {
202            let transaction = tx
203                .try_into_recovered_unchecked()
204                .map_err(|_| EthApiError::InvalidTransactionSignature)?;
205
206            return Ok(Some(TransactionSource::Block {
207                transaction,
208                index: meta.index,
209                block_hash: meta.block_hash,
210                block_number: meta.block_number,
211                base_fee: meta.base_fee,
212            }));
213        }
214
215        // 2. check flashblocks (sequencer preconfirmations)
216        if let Ok(Some(pending_block)) = self.pending_flashblock().await &&
217            let Some(indexed_tx) = pending_block.block().find_indexed(hash)
218        {
219            let meta = indexed_tx.meta();
220            return Ok(Some(TransactionSource::Block {
221                transaction: indexed_tx.recovered_tx().cloned(),
222                index: meta.index,
223                block_hash: meta.block_hash,
224                block_number: meta.block_number,
225                base_fee: meta.base_fee,
226            }));
227        }
228
229        // 3. check local pool
230        if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus()) {
231            return Ok(Some(TransactionSource::Pool(tx)));
232        }
233
234        Ok(None)
235    }
236}
237
238impl<N, Rpc> OpEthApi<N, Rpc>
239where
240    N: RpcNodeCore,
241    Rpc: RpcConvert<Primitives = N::Primitives>,
242{
243    /// Returns the [`SequencerClient`] if one is set.
244    pub fn raw_tx_forwarder(&self) -> Option<SequencerClient> {
245        self.inner.sequencer_client.clone()
246    }
247}
248
249/// Optimism implementation of [`TxInfoMapper`].
250///
251/// For deposits, receipt is fetched to extract `deposit_nonce` and `deposit_receipt_version`.
252/// Otherwise, it works like regular Ethereum implementation, i.e. uses [`TransactionInfo`].
253pub struct OpTxInfoMapper<Provider> {
254    provider: Provider,
255}
256
257impl<Provider: Clone> Clone for OpTxInfoMapper<Provider> {
258    fn clone(&self) -> Self {
259        Self { provider: self.provider.clone() }
260    }
261}
262
263impl<Provider> Debug for OpTxInfoMapper<Provider> {
264    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
265        f.debug_struct("OpTxInfoMapper").finish()
266    }
267}
268
269impl<Provider> OpTxInfoMapper<Provider> {
270    /// Creates [`OpTxInfoMapper`] that uses [`ReceiptProvider`] borrowed from given `eth_api`.
271    pub const fn new(provider: Provider) -> Self {
272        Self { provider }
273    }
274}
275
276impl<T, Provider> TxInfoMapper<T> for OpTxInfoMapper<Provider>
277where
278    T: OpTransaction + SignedTransaction,
279    Provider: ReceiptProvider<Receipt: DepositReceipt>,
280{
281    type Out = OpTransactionInfo;
282    type Err = ProviderError;
283
284    fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
285        try_into_op_tx_info(&self.provider, tx, tx_info)
286    }
287}