reth_optimism_rpc/eth/
transaction.rs

1//! Loads and formats OP transaction RPC response.
2
3use crate::{OpEthApi, OpEthApiError, SequencerClient};
4use alloy_primitives::{Bytes, B256};
5use alloy_rpc_types_eth::TransactionInfo;
6use futures::StreamExt;
7use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
8use reth_chain_state::CanonStateSubscriptions;
9use reth_optimism_primitives::DepositReceipt;
10use reth_primitives_traits::{
11    BlockBody, Recovered, SignedTransaction, SignerRecoverable, WithEncoded,
12};
13use reth_rpc_eth_api::{
14    helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction, SpawnBlocking},
15    try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
16    RpcReceipt, TxInfoMapper,
17};
18use reth_rpc_eth_types::{EthApiError, TransactionSource};
19use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
20use reth_transaction_pool::{
21    AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
22};
23use std::{
24    fmt::{Debug, Formatter},
25    future::Future,
26    time::Duration,
27};
28use tokio_stream::wrappers::WatchStream;
29
30impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
31where
32    N: RpcNodeCore,
33    OpEthApiError: FromEvmError<N::Evm>,
34    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
35{
36    fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
37        self.inner.eth_api.signers()
38    }
39
40    fn send_raw_transaction_sync_timeout(&self) -> Duration {
41        self.inner.eth_api.send_raw_transaction_sync_timeout()
42    }
43
44    async fn send_transaction(
45        &self,
46        tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
47    ) -> Result<B256, Self::Error> {
48        let (tx, recovered) = tx.split();
49
50        // broadcast raw transaction to subscribers if there is any.
51        self.eth_api().broadcast_raw_transaction(tx.clone());
52
53        let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
54
55        // On optimism, transactions are forwarded directly to the sequencer to be included in
56        // blocks that it builds.
57        if let Some(client) = self.raw_tx_forwarder().as_ref() {
58            tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to sequencer");
59            let hash = client.forward_raw_transaction(&tx).await.inspect_err(|err| {
60                    tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction");
61                })?;
62
63            // Retain tx in local tx pool after forwarding, for local RPC usage.
64            let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| {
65                tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
66            });
67
68            return Ok(hash)
69        }
70
71        // submit the transaction to the pool with a `Local` origin
72        let AddedTransactionOutcome { hash, .. } = self
73            .pool()
74            .add_transaction(TransactionOrigin::Local, pool_transaction)
75            .await
76            .map_err(Self::Error::from_eth_err)?;
77
78        Ok(hash)
79    }
80
81    /// Decodes and recovers the transaction and submits it to the pool.
82    ///
83    /// And awaits the receipt, checking both canonical blocks and flashblocks for faster
84    /// confirmation.
85    fn send_raw_transaction_sync(
86        &self,
87        tx: Bytes,
88    ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send {
89        let this = self.clone();
90        let timeout_duration = self.send_raw_transaction_sync_timeout();
91        async move {
92            let mut canonical_stream = this.provider().canonical_state_stream();
93            let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
94            let mut flashblock_stream = this.pending_block_rx().map(WatchStream::new);
95
96            tokio::time::timeout(timeout_duration, async {
97                loop {
98                    tokio::select! {
99                        biased;
100                        // check if the tx was preconfirmed in a new flashblock
101                        flashblock = async {
102                            if let Some(stream) = &mut flashblock_stream {
103                                stream.next().await
104                            } else {
105                                futures::future::pending().await
106                            }
107                        } => {
108                            if let Some(flashblock) = flashblock.flatten() {
109                                // if flashblocks are supported, attempt to find id from the pending block
110                                if let Some(receipt) = flashblock
111                                .find_and_convert_transaction_receipt(hash, this.converter())
112                                {
113                                    return receipt;
114                                }
115                            }
116                        }
117                        // Listen for regular canonical block updates for inclusion
118                        canonical_notification = canonical_stream.next() => {
119                            if let Some(notification) = canonical_notification {
120                                let chain = notification.committed();
121                                for block in chain.blocks_iter() {
122                                    if block.body().contains_transaction(&hash)
123                                        && let Some(receipt) = this.transaction_receipt(hash).await? {
124                                            return Ok(receipt);
125                                        }
126                                }
127                            } else {
128                                // Canonical stream ended
129                                break;
130                            }
131                        }
132                    }
133                }
134                Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
135                    hash,
136                    duration: timeout_duration,
137                }))
138            })
139            .await
140            .unwrap_or_else(|_elapsed| {
141                Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
142                    hash,
143                    duration: timeout_duration,
144                }))
145            })
146        }
147    }
148
149    /// Returns the transaction receipt for the given hash.
150    ///
151    /// With flashblocks, we should also lookup the pending block for the transaction
152    /// because this is considered confirmed/mined.
153    fn transaction_receipt(
154        &self,
155        hash: B256,
156    ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
157    {
158        let this = self.clone();
159        async move {
160            // first attempt to fetch the mined transaction receipt data
161            let tx_receipt = this.load_transaction_and_receipt(hash).await?;
162
163            if tx_receipt.is_none() {
164                // if flashblocks are supported, attempt to find id from the pending block
165                if let Ok(Some(pending_block)) = this.pending_flashblock().await &&
166                    let Some(Ok(receipt)) = pending_block
167                        .find_and_convert_transaction_receipt(hash, this.converter())
168                {
169                    return Ok(Some(receipt));
170                }
171            }
172            let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };
173            self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
174        }
175    }
176}
177
178impl<N, Rpc> LoadTransaction for OpEthApi<N, Rpc>
179where
180    N: RpcNodeCore,
181    OpEthApiError: FromEvmError<N::Evm>,
182    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
183{
184    async fn transaction_by_hash(
185        &self,
186        hash: B256,
187    ) -> Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error> {
188        // 1. Try to find the transaction on disk (historical blocks)
189        if let Some((tx, meta)) = self
190            .spawn_blocking_io(move |this| {
191                this.provider()
192                    .transaction_by_hash_with_meta(hash)
193                    .map_err(Self::Error::from_eth_err)
194            })
195            .await?
196        {
197            let transaction = tx
198                .try_into_recovered_unchecked()
199                .map_err(|_| EthApiError::InvalidTransactionSignature)?;
200
201            return Ok(Some(TransactionSource::Block {
202                transaction,
203                index: meta.index,
204                block_hash: meta.block_hash,
205                block_number: meta.block_number,
206                base_fee: meta.base_fee,
207            }));
208        }
209
210        // 2. check flashblocks (sequencer preconfirmations)
211        if let Ok(Some(pending_block)) = self.pending_flashblock().await &&
212            let Some(indexed_tx) = pending_block.block().find_indexed(hash)
213        {
214            let meta = indexed_tx.meta();
215            return Ok(Some(TransactionSource::Block {
216                transaction: indexed_tx.recovered_tx().cloned(),
217                index: meta.index,
218                block_hash: meta.block_hash,
219                block_number: meta.block_number,
220                base_fee: meta.base_fee,
221            }));
222        }
223
224        // 3. check local pool
225        if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus()) {
226            return Ok(Some(TransactionSource::Pool(tx)));
227        }
228
229        Ok(None)
230    }
231}
232
233impl<N, Rpc> OpEthApi<N, Rpc>
234where
235    N: RpcNodeCore,
236    Rpc: RpcConvert<Primitives = N::Primitives>,
237{
238    /// Returns the [`SequencerClient`] if one is set.
239    pub fn raw_tx_forwarder(&self) -> Option<SequencerClient> {
240        self.inner.sequencer_client.clone()
241    }
242}
243
244/// Optimism implementation of [`TxInfoMapper`].
245///
246/// For deposits, receipt is fetched to extract `deposit_nonce` and `deposit_receipt_version`.
247/// Otherwise, it works like regular Ethereum implementation, i.e. uses [`TransactionInfo`].
248pub struct OpTxInfoMapper<Provider> {
249    provider: Provider,
250}
251
252impl<Provider: Clone> Clone for OpTxInfoMapper<Provider> {
253    fn clone(&self) -> Self {
254        Self { provider: self.provider.clone() }
255    }
256}
257
258impl<Provider> Debug for OpTxInfoMapper<Provider> {
259    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
260        f.debug_struct("OpTxInfoMapper").finish()
261    }
262}
263
264impl<Provider> OpTxInfoMapper<Provider> {
265    /// Creates [`OpTxInfoMapper`] that uses [`ReceiptProvider`] borrowed from given `eth_api`.
266    pub const fn new(provider: Provider) -> Self {
267        Self { provider }
268    }
269}
270
271impl<T, Provider> TxInfoMapper<T> for OpTxInfoMapper<Provider>
272where
273    T: OpTransaction + SignedTransaction,
274    Provider: ReceiptProvider<Receipt: DepositReceipt>,
275{
276    type Out = OpTransactionInfo;
277    type Err = ProviderError;
278
279    fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
280        try_into_op_tx_info(&self.provider, tx, tx_info)
281    }
282}