reth_optimism_rpc/eth/
transaction.rs

1//! Loads and formats OP transaction RPC response.
2
3use crate::{OpEthApi, OpEthApiError, SequencerClient};
4use alloy_consensus::TxReceipt as _;
5use alloy_primitives::{Bytes, B256};
6use alloy_rpc_types_eth::TransactionInfo;
7use futures::StreamExt;
8use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
9use reth_chain_state::CanonStateSubscriptions;
10use reth_optimism_primitives::DepositReceipt;
11use reth_primitives_traits::{BlockBody, SignedTransaction, SignerRecoverable};
12use reth_rpc_convert::transaction::ConvertReceiptInput;
13use reth_rpc_eth_api::{
14    helpers::{
15        receipt::calculate_gas_used_and_next_log_index, spec::SignersForRpc, EthTransactions,
16        LoadReceipt, LoadTransaction,
17    },
18    try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
19    RpcReceipt, TxInfoMapper,
20};
21use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
22use reth_storage_api::{errors::ProviderError, ReceiptProvider};
23use reth_transaction_pool::{
24    AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
25};
26use std::{
27    fmt::{Debug, Formatter},
28    future::Future,
29    time::Duration,
30};
31use tokio_stream::wrappers::WatchStream;
32
33impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
34where
35    N: RpcNodeCore,
36    OpEthApiError: FromEvmError<N::Evm>,
37    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
38{
39    fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
40        self.inner.eth_api.signers()
41    }
42
43    fn send_raw_transaction_sync_timeout(&self) -> Duration {
44        self.inner.eth_api.send_raw_transaction_sync_timeout()
45    }
46
47    /// Decodes and recovers the transaction and submits it to the pool.
48    ///
49    /// Returns the hash of the transaction.
50    async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> {
51        let recovered = recover_raw_transaction(&tx)?;
52
53        // broadcast raw transaction to subscribers if there is any.
54        self.eth_api().broadcast_raw_transaction(tx.clone());
55
56        let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
57
58        // On optimism, transactions are forwarded directly to the sequencer to be included in
59        // blocks that it builds.
60        if let Some(client) = self.raw_tx_forwarder().as_ref() {
61            tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to sequencer");
62            let hash = client.forward_raw_transaction(&tx).await.inspect_err(|err| {
63                    tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction");
64                })?;
65
66            // Retain tx in local tx pool after forwarding, for local RPC usage.
67            let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| {
68                tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
69            });
70
71            return Ok(hash)
72        }
73
74        // submit the transaction to the pool with a `Local` origin
75        let AddedTransactionOutcome { hash, .. } = self
76            .pool()
77            .add_transaction(TransactionOrigin::Local, pool_transaction)
78            .await
79            .map_err(Self::Error::from_eth_err)?;
80
81        Ok(hash)
82    }
83
84    /// Decodes and recovers the transaction and submits it to the pool.
85    ///
86    /// And awaits the receipt, checking both canonical blocks and flashblocks for faster
87    /// confirmation.
88    fn send_raw_transaction_sync(
89        &self,
90        tx: Bytes,
91    ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
92    where
93        Self: LoadReceipt + 'static,
94    {
95        let this = self.clone();
96        let timeout_duration = self.send_raw_transaction_sync_timeout();
97        async move {
98            let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
99            let mut canonical_stream = this.provider().canonical_state_stream();
100            let flashblock_rx = this.pending_block_rx();
101            let mut flashblock_stream = flashblock_rx.map(WatchStream::new);
102
103            tokio::time::timeout(timeout_duration, async {
104                loop {
105                    tokio::select! {
106                        // Listen for regular canonical block updates for inclusion
107                        canonical_notification = canonical_stream.next() => {
108                            if let Some(notification) = canonical_notification {
109                                let chain = notification.committed();
110                                for block in chain.blocks_iter() {
111                                    if block.body().contains_transaction(&hash)
112                                        && let Some(receipt) = this.transaction_receipt(hash).await? {
113                                            return Ok(receipt);
114                                        }
115                                }
116                            } else {
117                                // Canonical stream ended
118                                break;
119                            }
120                        }
121                        // check if the tx was preconfirmed in a new flashblock
122                        _flashblock_update = async {
123                            if let Some(ref mut stream) = flashblock_stream {
124                                stream.next().await
125                            } else {
126                                futures::future::pending().await
127                            }
128                        } => {
129                            // Check flashblocks for faster confirmation (Optimism-specific)
130                            if let Ok(Some(pending_block)) = this.pending_flashblock().await {
131                                let block_and_receipts = pending_block.into_block_and_receipts();
132                                if block_and_receipts.block.body().contains_transaction(&hash)
133                                    && let Some(receipt) = this.transaction_receipt(hash).await? {
134                                        return Ok(receipt);
135                                    }
136                            }
137                        }
138                    }
139                }
140                Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
141                    hash,
142                    duration: timeout_duration,
143                }))
144            })
145            .await
146            .unwrap_or_else(|_elapsed| {
147                Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
148                    hash,
149                    duration: timeout_duration,
150                }))
151            })
152        }
153    }
154
155    /// Returns the transaction receipt for the given hash.
156    ///
157    /// With flashblocks, we should also lookup the pending block for the transaction
158    /// because this is considered confirmed/mined.
159    fn transaction_receipt(
160        &self,
161        hash: B256,
162    ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
163    {
164        let this = self.clone();
165        async move {
166            // first attempt to fetch the mined transaction receipt data
167            let tx_receipt = this.load_transaction_and_receipt(hash).await?;
168
169            if tx_receipt.is_none() {
170                // if flashblocks are supported, attempt to find id from the pending block
171                if let Ok(Some(pending_block)) = this.pending_flashblock().await {
172                    let block_and_receipts = pending_block.into_block_and_receipts();
173                    if let Some((tx, receipt)) =
174                        block_and_receipts.find_transaction_and_receipt_by_hash(hash)
175                    {
176                        // Build tx receipt from pending block and receipts directly inline.
177                        // This avoids canonical cache lookup that would be done by the
178                        // `build_transaction_receipt` which would result in a block not found
179                        // issue. See: https://github.com/paradigmxyz/reth/issues/18529
180                        let meta = tx.meta();
181                        let all_receipts = &block_and_receipts.receipts;
182
183                        let (gas_used, next_log_index) =
184                            calculate_gas_used_and_next_log_index(meta.index, all_receipts);
185
186                        return Ok(Some(
187                            this.tx_resp_builder()
188                                .convert_receipts_with_block(
189                                    vec![ConvertReceiptInput {
190                                        tx: tx
191                                            .tx()
192                                            .clone()
193                                            .try_into_recovered_unchecked()
194                                            .map_err(Self::Error::from_eth_err)?
195                                            .as_recovered_ref(),
196                                        gas_used: receipt.cumulative_gas_used() - gas_used,
197                                        receipt: receipt.clone(),
198                                        next_log_index,
199                                        meta,
200                                    }],
201                                    block_and_receipts.sealed_block(),
202                                )?
203                                .pop()
204                                .unwrap(),
205                        ))
206                    }
207                }
208            }
209            let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };
210            self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
211        }
212    }
213}
214
215impl<N, Rpc> LoadTransaction for OpEthApi<N, Rpc>
216where
217    N: RpcNodeCore,
218    OpEthApiError: FromEvmError<N::Evm>,
219    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
220{
221}
222
223impl<N, Rpc> OpEthApi<N, Rpc>
224where
225    N: RpcNodeCore,
226    Rpc: RpcConvert<Primitives = N::Primitives>,
227{
228    /// Returns the [`SequencerClient`] if one is set.
229    pub fn raw_tx_forwarder(&self) -> Option<SequencerClient> {
230        self.inner.sequencer_client.clone()
231    }
232}
233
234/// Optimism implementation of [`TxInfoMapper`].
235///
236/// For deposits, receipt is fetched to extract `deposit_nonce` and `deposit_receipt_version`.
237/// Otherwise, it works like regular Ethereum implementation, i.e. uses [`TransactionInfo`].
238pub struct OpTxInfoMapper<Provider> {
239    provider: Provider,
240}
241
242impl<Provider: Clone> Clone for OpTxInfoMapper<Provider> {
243    fn clone(&self) -> Self {
244        Self { provider: self.provider.clone() }
245    }
246}
247
248impl<Provider> Debug for OpTxInfoMapper<Provider> {
249    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
250        f.debug_struct("OpTxInfoMapper").finish()
251    }
252}
253
254impl<Provider> OpTxInfoMapper<Provider> {
255    /// Creates [`OpTxInfoMapper`] that uses [`ReceiptProvider`] borrowed from given `eth_api`.
256    pub const fn new(provider: Provider) -> Self {
257        Self { provider }
258    }
259}
260
261impl<T, Provider> TxInfoMapper<T> for OpTxInfoMapper<Provider>
262where
263    T: OpTransaction + SignedTransaction,
264    Provider: ReceiptProvider<Receipt: DepositReceipt>,
265{
266    type Out = OpTransactionInfo;
267    type Err = ProviderError;
268
269    fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
270        try_into_op_tx_info(&self.provider, tx, tx_info)
271    }
272}