reth_rpc_eth_api/helpers/
transaction.rs1use super::{EthApiSpec, EthSigner, LoadBlock, LoadReceipt, LoadState, SpawnBlocking};
5use crate::{
6 helpers::{estimate::EstimateCall, spec::SignersForRpc},
7 FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
8 RpcTransaction,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta},
12 BlockHeader, Transaction,
13};
14use alloy_dyn_abi::TypedData;
15use alloy_eips::{eip2718::Encodable2718, BlockId};
16use alloy_network::TransactionBuilder;
17use alloy_primitives::{Address, Bytes, TxHash, B256};
18use alloy_rpc_types_eth::{BlockNumberOrTag, TransactionInfo};
19use futures::{Future, StreamExt};
20use reth_chain_state::CanonStateSubscriptions;
21use reth_node_api::BlockBody;
22use reth_primitives_traits::{RecoveredBlock, SignedTransaction};
23use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq};
24use reth_rpc_eth_types::{
25 utils::binary_search, EthApiError, EthApiError::TransactionConfirmationTimeout, SignError,
26 TransactionSource,
27};
28use reth_storage_api::{
29 BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
30 TransactionsProvider,
31};
32use reth_transaction_pool::{
33 AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
34};
35use std::sync::Arc;
36
37pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
60 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes>;
64
65 fn send_raw_transaction(
69 &self,
70 tx: Bytes,
71 ) -> impl Future<Output = Result<B256, Self::Error>> + Send;
72
73 fn send_raw_transaction_sync(
77 &self,
78 tx: Bytes,
79 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
80 where
81 Self: LoadReceipt + 'static,
82 {
83 let this = self.clone();
84 async move {
85 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
86 let mut stream = this.provider().canonical_state_stream();
87 const TIMEOUT_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(30);
88 tokio::time::timeout(TIMEOUT_DURATION, async {
89 while let Some(notification) = stream.next().await {
90 let chain = notification.committed();
91 for block in chain.blocks_iter() {
92 if block.body().contains_transaction(&hash) {
93 if let Some(receipt) = this.transaction_receipt(hash).await? {
94 return Ok(receipt);
95 }
96 }
97 }
98 }
99 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
100 hash,
101 duration: TIMEOUT_DURATION,
102 }))
103 })
104 .await
105 .unwrap_or_else(|_elapsed| {
106 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
107 hash,
108 duration: TIMEOUT_DURATION,
109 }))
110 })
111 }
112 }
113
114 #[expect(clippy::complexity)]
120 fn transaction_by_hash(
121 &self,
122 hash: B256,
123 ) -> impl Future<
124 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
125 > + Send {
126 LoadTransaction::transaction_by_hash(self, hash)
127 }
128
129 #[expect(clippy::type_complexity)]
133 fn transactions_by_block(
134 &self,
135 block: B256,
136 ) -> impl Future<Output = Result<Option<Vec<ProviderTx<Self::Provider>>>, Self::Error>> + Send
137 {
138 async move {
139 self.cache()
140 .get_recovered_block(block)
141 .await
142 .map(|b| b.map(|b| b.body().transactions().to_vec()))
143 .map_err(Self::Error::from_eth_err)
144 }
145 }
146
147 fn raw_transaction_by_hash(
155 &self,
156 hash: B256,
157 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send {
158 async move {
159 if let Some(tx) =
161 self.pool().get_pooled_transaction_element(hash).map(|tx| tx.encoded_2718().into())
162 {
163 return Ok(Some(tx))
164 }
165
166 self.spawn_blocking_io(move |ref this| {
167 Ok(this
168 .provider()
169 .transaction_by_hash(hash)
170 .map_err(Self::Error::from_eth_err)?
171 .map(|tx| tx.encoded_2718().into()))
172 })
173 .await
174 }
175 }
176
177 #[expect(clippy::type_complexity)]
179 fn historical_transaction_by_hash_at(
180 &self,
181 hash: B256,
182 ) -> impl Future<
183 Output = Result<Option<(TransactionSource<ProviderTx<Self::Provider>>, B256)>, Self::Error>,
184 > + Send {
185 async move {
186 match self.transaction_by_hash_at(hash).await? {
187 None => Ok(None),
188 Some((tx, at)) => Ok(at.as_block_hash().map(|hash| (tx, hash))),
189 }
190 }
191 }
192
193 fn transaction_receipt(
198 &self,
199 hash: B256,
200 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
201 where
202 Self: LoadReceipt + 'static,
203 {
204 async move {
205 match self.load_transaction_and_receipt(hash).await? {
206 Some((tx, meta, receipt)) => {
207 self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
208 }
209 None => Ok(None),
210 }
211 }
212 }
213
214 #[expect(clippy::complexity)]
216 fn load_transaction_and_receipt(
217 &self,
218 hash: TxHash,
219 ) -> impl Future<
220 Output = Result<
221 Option<(ProviderTx<Self::Provider>, TransactionMeta, ProviderReceipt<Self::Provider>)>,
222 Self::Error,
223 >,
224 > + Send
225 where
226 Self: 'static,
227 {
228 self.spawn_blocking_io(move |this| {
229 let provider = this.provider();
230 let (tx, meta) = match provider
231 .transaction_by_hash_with_meta(hash)
232 .map_err(Self::Error::from_eth_err)?
233 {
234 Some((tx, meta)) => (tx, meta),
235 None => return Ok(None),
236 };
237
238 let receipt = match provider.receipt_by_hash(hash).map_err(Self::Error::from_eth_err)? {
239 Some(recpt) => recpt,
240 None => return Ok(None),
241 };
242
243 Ok(Some((tx, meta, receipt)))
244 })
245 }
246
247 fn transaction_by_block_and_tx_index(
251 &self,
252 block_id: BlockId,
253 index: usize,
254 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
255 where
256 Self: LoadBlock,
257 {
258 async move {
259 if let Some(block) = self.recovered_block(block_id).await? {
260 let block_hash = block.hash();
261 let block_number = block.number();
262 let base_fee_per_gas = block.base_fee_per_gas();
263 if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
264 let tx_info = TransactionInfo {
265 hash: Some(*tx.tx_hash()),
266 block_hash: Some(block_hash),
267 block_number: Some(block_number),
268 base_fee: base_fee_per_gas,
269 index: Some(index as u64),
270 };
271
272 return Ok(Some(
273 self.tx_resp_builder().fill(tx.clone().with_signer(*signer), tx_info)?,
274 ))
275 }
276 }
277
278 Ok(None)
279 }
280 }
281
282 fn get_transaction_by_sender_and_nonce(
284 &self,
285 sender: Address,
286 nonce: u64,
287 include_pending: bool,
288 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
289 where
290 Self: LoadBlock + LoadState,
291 {
292 async move {
293 if include_pending {
295 if let Some(tx) =
296 RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
297 {
298 let transaction = tx.transaction.clone_into_consensus();
299 return Ok(Some(self.tx_resp_builder().fill_pending(transaction)?));
300 }
301 }
302
303 if !self.get_code(sender, None).await?.is_empty() {
305 return Ok(None);
306 }
307
308 let highest = self.transaction_count(sender, None).await?.saturating_to::<u64>();
309
310 if nonce >= highest {
313 return Ok(None);
314 }
315
316 let Ok(high) = self.provider().best_block_number() else {
317 return Err(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()).into());
318 };
319
320 let num = binary_search::<_, _, Self::Error>(1, high, |mid| async move {
323 let mid_nonce =
324 self.transaction_count(sender, Some(mid.into())).await?.saturating_to::<u64>();
325
326 Ok(mid_nonce > nonce)
327 })
328 .await?;
329
330 let block_id = num.into();
331 self.recovered_block(block_id)
332 .await?
333 .and_then(|block| {
334 let block_hash = block.hash();
335 let block_number = block.number();
336 let base_fee_per_gas = block.base_fee_per_gas();
337
338 block
339 .transactions_with_sender()
340 .enumerate()
341 .find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
342 .map(|(index, (signer, tx))| {
343 let tx_info = TransactionInfo {
344 hash: Some(*tx.tx_hash()),
345 block_hash: Some(block_hash),
346 block_number: Some(block_number),
347 base_fee: base_fee_per_gas,
348 index: Some(index as u64),
349 };
350 self.tx_resp_builder().fill(tx.clone().with_signer(*signer), tx_info)
351 })
352 })
353 .ok_or(EthApiError::HeaderNotFound(block_id))?
354 .map(Some)
355 }
356 }
357
358 fn raw_transaction_by_block_and_tx_index(
362 &self,
363 block_id: BlockId,
364 index: usize,
365 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send
366 where
367 Self: LoadBlock,
368 {
369 async move {
370 if let Some(block) = self.recovered_block(block_id).await? {
371 if let Some(tx) = block.body().transactions().get(index) {
372 return Ok(Some(tx.encoded_2718().into()))
373 }
374 }
375
376 Ok(None)
377 }
378 }
379
380 fn send_transaction(
383 &self,
384 mut request: RpcTxReq<Self::NetworkTypes>,
385 ) -> impl Future<Output = Result<B256, Self::Error>> + Send
386 where
387 Self: EthApiSpec + LoadBlock + EstimateCall,
388 {
389 async move {
390 let from = match request.as_ref().from() {
391 Some(from) => from,
392 None => return Err(SignError::NoAccount.into_eth_err()),
393 };
394
395 if self.find_signer(&from).is_err() {
396 return Err(SignError::NoAccount.into_eth_err())
397 }
398
399 if request.as_ref().nonce().is_none() {
401 let nonce = self.next_available_nonce(from).await?;
402 request.as_mut().set_nonce(nonce);
403 }
404
405 let chain_id = self.chain_id();
406 request.as_mut().set_chain_id(chain_id.to());
407
408 let estimated_gas =
409 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
410 let gas_limit = estimated_gas;
411 request.as_mut().set_gas_limit(gas_limit.to());
412
413 let transaction = self.sign_request(&from, request).await?.with_signer(from);
414
415 let pool_transaction =
416 <<Self as RpcNodeCore>::Pool as TransactionPool>::Transaction::try_from_consensus(
417 transaction,
418 )
419 .map_err(|_| EthApiError::TransactionConversionError)?;
420
421 let AddedTransactionOutcome { hash, .. } = self
423 .pool()
424 .add_transaction(TransactionOrigin::Local, pool_transaction)
425 .await
426 .map_err(Self::Error::from_eth_err)?;
427
428 Ok(hash)
429 }
430 }
431
432 fn sign_request(
434 &self,
435 from: &Address,
436 txn: RpcTxReq<Self::NetworkTypes>,
437 ) -> impl Future<Output = Result<ProviderTx<Self::Provider>, Self::Error>> + Send {
438 async move {
439 self.find_signer(from)?
440 .sign_transaction(txn, from)
441 .await
442 .map_err(Self::Error::from_eth_err)
443 }
444 }
445
446 fn sign(
448 &self,
449 account: Address,
450 message: Bytes,
451 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
452 async move {
453 Ok(self
454 .find_signer(&account)?
455 .sign(account, &message)
456 .await
457 .map_err(Self::Error::from_eth_err)?
458 .as_bytes()
459 .into())
460 }
461 }
462
463 fn sign_transaction(
466 &self,
467 request: RpcTxReq<Self::NetworkTypes>,
468 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
469 async move {
470 let from = match request.as_ref().from() {
471 Some(from) => from,
472 None => return Err(SignError::NoAccount.into_eth_err()),
473 };
474
475 Ok(self.sign_request(&from, request).await?.encoded_2718().into())
476 }
477 }
478
479 fn sign_typed_data(&self, data: &TypedData, account: Address) -> Result<Bytes, Self::Error> {
481 Ok(self
482 .find_signer(&account)?
483 .sign_typed_data(account, data)
484 .map_err(Self::Error::from_eth_err)?
485 .as_bytes()
486 .into())
487 }
488
489 #[expect(clippy::type_complexity)]
491 fn find_signer(
492 &self,
493 account: &Address,
494 ) -> Result<
495 Box<dyn EthSigner<ProviderTx<Self::Provider>, RpcTxReq<Self::NetworkTypes>> + 'static>,
496 Self::Error,
497 > {
498 self.signers()
499 .read()
500 .iter()
501 .find(|signer| signer.is_signer_for(account))
502 .map(|signer| dyn_clone::clone_box(&**signer))
503 .ok_or_else(|| SignError::NoAccount.into_eth_err())
504 }
505}
506
507pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
512 #[expect(clippy::complexity)]
518 fn transaction_by_hash(
519 &self,
520 hash: B256,
521 ) -> impl Future<
522 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
523 > + Send {
524 async move {
525 let mut resp = self
527 .spawn_blocking_io(move |this| {
528 match this
529 .provider()
530 .transaction_by_hash_with_meta(hash)
531 .map_err(Self::Error::from_eth_err)?
532 {
533 None => Ok(None),
534 Some((tx, meta)) => {
535 let transaction = tx
539 .try_into_recovered_unchecked()
540 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
541
542 let tx = TransactionSource::Block {
543 transaction,
544 index: meta.index,
545 block_hash: meta.block_hash,
546 block_number: meta.block_number,
547 base_fee: meta.base_fee,
548 };
549 Ok(Some(tx))
550 }
551 }
552 })
553 .await?;
554
555 if resp.is_none() {
556 if let Some(tx) =
558 self.pool().get(&hash).map(|tx| tx.transaction.clone().into_consensus())
559 {
560 resp = Some(TransactionSource::Pool(tx.into()));
561 }
562 }
563
564 Ok(resp)
565 }
566 }
567
568 #[expect(clippy::type_complexity)]
572 fn transaction_by_hash_at(
573 &self,
574 transaction_hash: B256,
575 ) -> impl Future<
576 Output = Result<
577 Option<(TransactionSource<ProviderTx<Self::Provider>>, BlockId)>,
578 Self::Error,
579 >,
580 > + Send {
581 async move {
582 Ok(self.transaction_by_hash(transaction_hash).await?.map(|tx| match tx {
583 tx @ TransactionSource::Pool(_) => (tx, BlockId::pending()),
584 tx @ TransactionSource::Block { block_hash, .. } => {
585 (tx, BlockId::Hash(block_hash.into()))
586 }
587 }))
588 }
589 }
590
591 #[expect(clippy::type_complexity)]
593 fn transaction_and_block(
594 &self,
595 hash: B256,
596 ) -> impl Future<
597 Output = Result<
598 Option<(
599 TransactionSource<ProviderTx<Self::Provider>>,
600 Arc<RecoveredBlock<ProviderBlock<Self::Provider>>>,
601 )>,
602 Self::Error,
603 >,
604 > + Send {
605 async move {
606 let (transaction, at) = match self.transaction_by_hash_at(hash).await? {
607 None => return Ok(None),
608 Some(res) => res,
609 };
610
611 let block_hash = match at {
613 BlockId::Hash(hash) => hash.block_hash,
614 _ => return Ok(None),
615 };
616 let block = self
617 .cache()
618 .get_recovered_block(block_hash)
619 .await
620 .map_err(Self::Error::from_eth_err)?;
621 Ok(block.map(|block| (transaction, block)))
622 }
623 }
624}