reth_rpc_eth_api/helpers/
transaction.rs1use super::{EthApiSpec, EthSigner, LoadBlock, LoadFee, LoadReceipt, LoadState, SpawnBlocking};
5use crate::{
6 helpers::{estimate::EstimateCall, spec::SignersForRpc},
7 FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
8 RpcTransaction,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
12 BlockHeader, Transaction,
13};
14use alloy_dyn_abi::TypedData;
15use alloy_eips::{eip2718::Encodable2718, BlockId};
16use alloy_network::{TransactionBuilder, TransactionBuilder4844};
17use alloy_primitives::{Address, Bytes, TxHash, B256, U256};
18use alloy_rpc_types_eth::TransactionInfo;
19use futures::{Future, StreamExt};
20use reth_chain_state::CanonStateSubscriptions;
21use reth_primitives_traits::{
22 BlockBody, Recovered, RecoveredBlock, SignedTransaction, TxTy, WithEncoded,
23};
24use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq, TransactionConversionError};
25use reth_rpc_eth_types::{
26 block::convert_transaction_receipt,
27 utils::{binary_search, recover_raw_transaction},
28 EthApiError::{self, TransactionConfirmationTimeout},
29 FillTransaction, SignError, TransactionSource,
30};
31use reth_storage_api::{
32 BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
33 TransactionsProvider,
34};
35use reth_transaction_pool::{
36 AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
37};
38use std::{sync::Arc, time::Duration};
39
40pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
63 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes>;
67
68 fn accounts(&self) -> Vec<Address> {
70 self.signers().read().iter().flat_map(|s| s.accounts()).collect()
71 }
72
73 fn send_raw_transaction_sync_timeout(&self) -> Duration;
75
76 fn send_raw_transaction(
80 &self,
81 tx: Bytes,
82 ) -> impl Future<Output = Result<B256, Self::Error>> + Send {
83 async move {
84 let recovered = recover_raw_transaction::<PoolPooledTx<Self::Pool>>(&tx)?;
85 self.send_transaction(TransactionOrigin::External, WithEncoded::new(tx, recovered))
86 .await
87 }
88 }
89
90 fn send_transaction(
92 &self,
93 origin: TransactionOrigin,
94 tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
95 ) -> impl Future<Output = Result<B256, Self::Error>> + Send;
96
97 fn send_raw_transaction_sync(
101 &self,
102 tx: Bytes,
103 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
104 where
105 Self: LoadReceipt + 'static,
106 {
107 let this = self.clone();
108 let timeout_duration = self.send_raw_transaction_sync_timeout();
109 async move {
110 let mut stream = this.provider().canonical_state_stream();
111 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
112 tokio::time::timeout(timeout_duration, async {
113 while let Some(notification) = stream.next().await {
114 let chain = notification.committed();
115 if let Some((block, tx, receipt, all_receipts)) =
116 chain.find_transaction_and_receipt_by_hash(hash) &&
117 let Some(receipt) = convert_transaction_receipt(
118 block,
119 all_receipts,
120 tx,
121 receipt,
122 this.converter(),
123 )
124 .transpose()
125 .map_err(Self::Error::from)?
126 {
127 return Ok(receipt);
128 }
129 }
130 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
131 hash,
132 duration: timeout_duration,
133 }))
134 })
135 .await
136 .unwrap_or_else(|_elapsed| {
137 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
138 hash,
139 duration: timeout_duration,
140 }))
141 })
142 }
143 }
144
145 #[expect(clippy::complexity)]
151 fn transaction_by_hash(
152 &self,
153 hash: B256,
154 ) -> impl Future<
155 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
156 > + Send {
157 LoadTransaction::transaction_by_hash(self, hash)
158 }
159
160 #[expect(clippy::type_complexity)]
164 fn transactions_by_block(
165 &self,
166 block: B256,
167 ) -> impl Future<Output = Result<Option<Vec<ProviderTx<Self::Provider>>>, Self::Error>> + Send
168 {
169 async move {
170 self.cache()
171 .get_recovered_block(block)
172 .await
173 .map(|b| b.map(|b| b.body().transactions().to_vec()))
174 .map_err(Self::Error::from_eth_err)
175 }
176 }
177
178 fn raw_transaction_by_hash(
186 &self,
187 hash: B256,
188 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send {
189 async move {
190 if let Some(tx) =
192 self.pool().get_pooled_transaction_element(hash).map(|tx| tx.encoded_2718().into())
193 {
194 return Ok(Some(tx))
195 }
196
197 self.spawn_blocking_io(move |ref this| {
198 Ok(this
199 .provider()
200 .transaction_by_hash(hash)
201 .map_err(Self::Error::from_eth_err)?
202 .map(|tx| tx.encoded_2718().into()))
203 })
204 .await
205 }
206 }
207
208 #[expect(clippy::type_complexity)]
210 fn historical_transaction_by_hash_at(
211 &self,
212 hash: B256,
213 ) -> impl Future<
214 Output = Result<Option<(TransactionSource<ProviderTx<Self::Provider>>, B256)>, Self::Error>,
215 > + Send {
216 async move {
217 match self.transaction_by_hash_at(hash).await? {
218 None => Ok(None),
219 Some((tx, at)) => Ok(at.as_block_hash().map(|hash| (tx, hash))),
220 }
221 }
222 }
223
224 fn transaction_receipt(
229 &self,
230 hash: B256,
231 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
232 where
233 Self: LoadReceipt + 'static,
234 {
235 async move {
236 match self.load_transaction_and_receipt(hash).await? {
237 Some((tx, meta, receipt, all_receipts)) => {
238 self.build_transaction_receipt(tx, meta, receipt, all_receipts).await.map(Some)
239 }
240 None => Ok(None),
241 }
242 }
243 }
244
245 #[expect(clippy::complexity)]
249 fn load_transaction_and_receipt(
250 &self,
251 hash: TxHash,
252 ) -> impl Future<
253 Output = Result<
254 Option<(
255 Recovered<ProviderTx<Self::Provider>>,
256 TransactionMeta,
257 ProviderReceipt<Self::Provider>,
258 Option<Arc<Vec<ProviderReceipt<Self::Provider>>>>,
259 )>,
260 Self::Error,
261 >,
262 > + Send
263 where
264 Self: 'static,
265 {
266 async move {
267 if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
268 let Some(tx) = cached.recovered_transaction().map(|tx| tx.cloned())
269 {
270 let meta = cached.transaction_meta(hash);
271
272 if let Some(all_receipts) = cached.receipts.clone() &&
274 let Some(receipt) = all_receipts.get(cached.tx_index).cloned()
275 {
276 return Ok(Some((tx, meta, receipt, Some(all_receipts))));
277 }
278
279 if let Some(receipts) = self
283 .cache()
284 .get_receipts(cached.block.hash())
285 .await
286 .map_err(Self::Error::from_eth_err)? &&
287 let Some(receipt) = receipts.get(cached.tx_index).cloned()
288 {
289 return Ok(Some((tx, meta, receipt, Some(receipts))));
290 }
291 }
292
293 self.spawn_blocking_io(move |this| {
295 let provider = this.provider();
296 let Some((tx, meta)) = provider
297 .transaction_by_hash_with_meta(hash)
298 .map_err(Self::Error::from_eth_err)?
299 else {
300 return Ok(None);
301 };
302
303 let tx = tx.try_into_recovered_unchecked().map_err(Self::Error::from_eth_err)?;
304
305 let receipt = provider.receipt_by_hash(hash).map_err(Self::Error::from_eth_err)?;
306
307 Ok(receipt.map(|receipt| (tx, meta, receipt, None)))
308 })
309 .await
310 }
311 }
312
313 fn transaction_by_block_and_tx_index(
317 &self,
318 block_id: BlockId,
319 index: usize,
320 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
321 where
322 Self: LoadBlock,
323 {
324 async move {
325 if let Some(block) = self.recovered_block(block_id).await? {
326 let block_hash = block.hash();
327 let block_number = block.number();
328 let base_fee_per_gas = block.base_fee_per_gas();
329 if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
330 #[allow(clippy::needless_update)]
331 let tx_info = TransactionInfo {
332 hash: Some(*tx.tx_hash()),
333 block_hash: Some(block_hash),
334 block_number: Some(block_number),
335 base_fee: base_fee_per_gas,
336 index: Some(index as u64),
337 ..Default::default()
338 };
339
340 return Ok(Some(
341 self.converter().fill(tx.clone().with_signer(*signer), tx_info)?,
342 ))
343 }
344 }
345
346 Ok(None)
347 }
348 }
349
350 fn get_transaction_by_sender_and_nonce(
352 &self,
353 sender: Address,
354 nonce: u64,
355 include_pending: bool,
356 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
357 where
358 Self: LoadBlock + LoadState,
359 {
360 async move {
361 if include_pending &&
363 let Some(tx) =
364 RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
365 {
366 let transaction = tx.transaction.clone_into_consensus();
367 return Ok(Some(self.converter().fill_pending(transaction)?));
368 }
369
370 let highest = self.transaction_count(sender, None).await?.saturating_to::<u64>();
374
375 if nonce >= highest {
378 return Ok(None);
379 }
380
381 let high = self.provider().best_block_number().map_err(Self::Error::from_eth_err)?;
382
383 let num = binary_search::<_, _, Self::Error>(1, high, |mid| async move {
386 let mid_nonce =
387 self.transaction_count(sender, Some(mid.into())).await?.saturating_to::<u64>();
388
389 Ok(mid_nonce > nonce)
390 })
391 .await?;
392
393 let block_id = num.into();
394 self.recovered_block(block_id)
395 .await?
396 .and_then(|block| {
397 let block_hash = block.hash();
398 let block_number = block.number();
399 let base_fee_per_gas = block.base_fee_per_gas();
400
401 block
402 .transactions_with_sender()
403 .enumerate()
404 .find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
405 .map(|(index, (signer, tx))| {
406 #[allow(clippy::needless_update)]
407 let tx_info = TransactionInfo {
408 hash: Some(*tx.tx_hash()),
409 block_hash: Some(block_hash),
410 block_number: Some(block_number),
411 base_fee: base_fee_per_gas,
412 index: Some(index as u64),
413 ..Default::default()
414 };
415 Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
416 })
417 })
418 .ok_or(EthApiError::HeaderNotFound(block_id))?
419 .map(Some)
420 }
421 }
422
423 fn raw_transaction_by_block_and_tx_index(
427 &self,
428 block_id: BlockId,
429 index: usize,
430 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send
431 where
432 Self: LoadBlock,
433 {
434 async move {
435 if let Some(block) = self.recovered_block(block_id).await? &&
436 let Some(tx) = block.body().transactions().get(index)
437 {
438 return Ok(Some(tx.encoded_2718().into()))
439 }
440
441 Ok(None)
442 }
443 }
444
445 fn send_transaction_request(
448 &self,
449 mut request: RpcTxReq<Self::NetworkTypes>,
450 ) -> impl Future<Output = Result<B256, Self::Error>> + Send
451 where
452 Self: EthApiSpec + LoadBlock + EstimateCall,
453 {
454 async move {
455 let from = match request.as_ref().from() {
456 Some(from) => from,
457 None => return Err(SignError::NoAccount.into_eth_err()),
458 };
459
460 if self.find_signer(&from).is_err() {
461 return Err(SignError::NoAccount.into_eth_err())
462 }
463
464 if request.as_ref().nonce().is_none() {
466 let nonce = self.next_available_nonce(from).await?;
467 request.as_mut().set_nonce(nonce);
468 }
469
470 let chain_id = self.chain_id();
471 request.as_mut().set_chain_id(chain_id.to());
472
473 let estimated_gas =
474 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
475 let gas_limit = estimated_gas;
476 request.as_mut().set_gas_limit(gas_limit.to());
477
478 let transaction = self.sign_request(&from, request).await?.with_signer(from);
479
480 let pool_transaction =
481 <<Self as RpcNodeCore>::Pool as TransactionPool>::Transaction::try_from_consensus(
482 transaction,
483 )
484 .map_err(|e| {
485 Self::Error::from_eth_err(TransactionConversionError::Other(e.to_string()))
486 })?;
487
488 let AddedTransactionOutcome { hash, .. } = self
490 .pool()
491 .add_transaction(TransactionOrigin::Local, pool_transaction)
492 .await
493 .map_err(Self::Error::from_eth_err)?;
494
495 Ok(hash)
496 }
497 }
498
499 fn fill_transaction(
501 &self,
502 mut request: RpcTxReq<Self::NetworkTypes>,
503 ) -> impl Future<Output = Result<FillTransaction<TxTy<Self::Primitives>>, Self::Error>> + Send
504 where
505 Self: EthApiSpec + LoadBlock + EstimateCall + LoadFee,
506 {
507 async move {
508 let from = match request.as_ref().from() {
509 Some(from) => from,
510 None => return Err(SignError::NoAccount.into_eth_err()),
511 };
512
513 if request.as_ref().value().is_none() {
514 request.as_mut().set_value(U256::ZERO);
515 }
516
517 if request.as_ref().nonce().is_none() {
518 let nonce = self.next_available_nonce(from).await?;
519 request.as_mut().set_nonce(nonce);
520 }
521
522 let chain_id = self.chain_id();
523 request.as_mut().set_chain_id(chain_id.to());
524
525 if request.as_ref().has_eip4844_fields() &&
526 request.as_ref().max_fee_per_blob_gas().is_none()
527 {
528 let blob_fee = self.blob_base_fee().await?;
529 request.as_mut().set_max_fee_per_blob_gas(blob_fee.to());
530 }
531
532 if request.as_ref().sidecar.is_some() &&
535 request.as_ref().blob_versioned_hashes.is_none()
536 {
537 request.as_mut().populate_blob_hashes();
538 }
539
540 if request.as_ref().gas_limit().is_none() {
541 let estimated_gas =
542 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
543 request.as_mut().set_gas_limit(estimated_gas.to());
544 }
545
546 if request.as_ref().gas_price().is_none() {
547 let tip = if let Some(tip) = request.as_ref().max_priority_fee_per_gas() {
548 tip
549 } else {
550 let tip = self.suggested_priority_fee().await?.to::<u128>();
551 request.as_mut().set_max_priority_fee_per_gas(tip);
552 tip
553 };
554 if request.as_ref().max_fee_per_gas().is_none() {
555 let header =
556 self.provider().latest_header().map_err(Self::Error::from_eth_err)?;
557 let base_fee = header.and_then(|h| h.base_fee_per_gas()).unwrap_or_default();
558 request.as_mut().set_max_fee_per_gas(base_fee as u128 + tip);
559 }
560 }
561
562 let tx = self.converter().build_simulate_v1_transaction(request)?;
563
564 let raw = tx.encoded_2718().into();
565
566 Ok(FillTransaction { raw, tx })
567 }
568 }
569
570 fn sign_request(
572 &self,
573 from: &Address,
574 txn: RpcTxReq<Self::NetworkTypes>,
575 ) -> impl Future<Output = Result<ProviderTx<Self::Provider>, Self::Error>> + Send {
576 async move {
577 self.find_signer(from)?
578 .sign_transaction(txn, from)
579 .await
580 .map_err(Self::Error::from_eth_err)
581 }
582 }
583
584 fn sign(
586 &self,
587 account: Address,
588 message: Bytes,
589 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
590 async move {
591 Ok(self
592 .find_signer(&account)?
593 .sign(account, &message)
594 .await
595 .map_err(Self::Error::from_eth_err)?
596 .as_bytes()
597 .into())
598 }
599 }
600
601 fn sign_transaction(
604 &self,
605 request: RpcTxReq<Self::NetworkTypes>,
606 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
607 async move {
608 let from = match request.as_ref().from() {
609 Some(from) => from,
610 None => return Err(SignError::NoAccount.into_eth_err()),
611 };
612
613 Ok(self.sign_request(&from, request).await?.encoded_2718().into())
614 }
615 }
616
617 fn sign_typed_data(&self, data: &TypedData, account: Address) -> Result<Bytes, Self::Error> {
619 Ok(self
620 .find_signer(&account)?
621 .sign_typed_data(account, data)
622 .map_err(Self::Error::from_eth_err)?
623 .as_bytes()
624 .into())
625 }
626
627 #[expect(clippy::type_complexity)]
629 fn find_signer(
630 &self,
631 account: &Address,
632 ) -> Result<
633 Box<dyn EthSigner<ProviderTx<Self::Provider>, RpcTxReq<Self::NetworkTypes>> + 'static>,
634 Self::Error,
635 > {
636 self.signers()
637 .read()
638 .iter()
639 .find(|signer| signer.is_signer_for(account))
640 .map(|signer| dyn_clone::clone_box(&**signer))
641 .ok_or_else(|| SignError::NoAccount.into_eth_err())
642 }
643}
644
645pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
650 #[expect(clippy::complexity)]
656 fn transaction_by_hash(
657 &self,
658 hash: B256,
659 ) -> impl Future<
660 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
661 > + Send {
662 async move {
663 if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
665 let Some(source) = cached.to_transaction_source()
666 {
667 return Ok(Some(source));
668 }
669
670 if let Some((tx, meta)) = self
672 .spawn_blocking_io(move |this| {
673 this.provider()
674 .transaction_by_hash_with_meta(hash)
675 .map_err(Self::Error::from_eth_err)
676 })
677 .await?
678 {
679 let transaction = tx
683 .try_into_recovered_unchecked()
684 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
685
686 return Ok(Some(TransactionSource::Block {
687 transaction,
688 index: meta.index,
689 block_hash: meta.block_hash,
690 block_number: meta.block_number,
691 base_fee: meta.base_fee,
692 }));
693 }
694
695 if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus())
697 {
698 return Ok(Some(TransactionSource::Pool(tx.into())));
699 }
700
701 Ok(None)
702 }
703 }
704
705 #[expect(clippy::type_complexity)]
709 fn transaction_by_hash_at(
710 &self,
711 transaction_hash: B256,
712 ) -> impl Future<
713 Output = Result<
714 Option<(TransactionSource<ProviderTx<Self::Provider>>, BlockId)>,
715 Self::Error,
716 >,
717 > + Send {
718 async move {
719 Ok(self.transaction_by_hash(transaction_hash).await?.map(|tx| match tx {
720 tx @ TransactionSource::Pool(_) => (tx, BlockId::pending()),
721 tx @ TransactionSource::Block { block_hash, .. } => {
722 (tx, BlockId::Hash(block_hash.into()))
723 }
724 }))
725 }
726 }
727
728 #[expect(clippy::type_complexity)]
730 fn transaction_and_block(
731 &self,
732 hash: B256,
733 ) -> impl Future<
734 Output = Result<
735 Option<(
736 TransactionSource<ProviderTx<Self::Provider>>,
737 Arc<RecoveredBlock<ProviderBlock<Self::Provider>>>,
738 )>,
739 Self::Error,
740 >,
741 > + Send {
742 async move {
743 let (transaction, at) = match self.transaction_by_hash_at(hash).await? {
744 None => return Ok(None),
745 Some(res) => res,
746 };
747
748 let block_hash = match at {
750 BlockId::Hash(hash) => hash.block_hash,
751 _ => return Ok(None),
752 };
753 let block = self
754 .cache()
755 .get_recovered_block(block_hash)
756 .await
757 .map_err(Self::Error::from_eth_err)?;
758 Ok(block.map(|block| (transaction, block)))
759 }
760 }
761}