1use super::{EthApiSpec, EthSigner, LoadBlock, LoadFee, LoadReceipt, LoadState, SpawnBlocking};
5use crate::{
6 helpers::{estimate::EstimateCall, spec::SignersForRpc},
7 FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
8 RpcTransaction,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
12 BlockHeader, Transaction,
13};
14use alloy_dyn_abi::TypedData;
15use alloy_eips::{eip2718::Encodable2718, BlockId};
16use alloy_network::{TransactionBuilder, TransactionBuilder4844};
17use alloy_primitives::{Address, Bytes, TxHash, B256, U256};
18use alloy_rpc_types_eth::{state::EvmOverrides, TransactionInfo};
19use futures::{Future, StreamExt};
20use reth_chain_state::CanonStateSubscriptions;
21use reth_primitives_traits::{
22 BlockBody, Recovered, RecoveredBlock, SignedTransaction, TxTy, WithEncoded,
23};
24use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq, TransactionConversionError};
25use reth_rpc_eth_types::{
26 block::convert_transaction_receipt,
27 utils::binary_search,
28 EthApiError::{self, TransactionConfirmationTimeout},
29 FillTransaction, SignError, TransactionSource,
30};
31use reth_storage_api::{
32 BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
33 TransactionsProvider,
34};
35use reth_transaction_pool::{
36 AddedTransactionOutcome, PoolPooledTx, PoolTransaction, PoolTx, TransactionOrigin,
37 TransactionPool,
38};
39use std::{sync::Arc, time::Duration};
40
41pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
64 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes>;
68
69 fn accounts(&self) -> Vec<Address> {
71 self.signers().read().iter().flat_map(|s| s.accounts()).collect()
72 }
73
74 fn send_raw_transaction_sync_timeout(&self) -> Duration;
76
77 fn send_raw_transaction(
81 &self,
82 tx: Bytes,
83 ) -> impl Future<Output = Result<B256, Self::Error>> + Send {
84 async move {
85 let pool_transaction =
86 <PoolTx<Self::Pool> as PoolTransaction>::recover_raw_transaction(&tx)
87 .map_err(Self::Error::from_eth_err)?;
88 self.send_pool_transaction(
89 TransactionOrigin::Local,
90 WithEncoded::new(tx, pool_transaction),
91 )
92 .await
93 }
94 }
95
96 fn send_transaction(
98 &self,
99 origin: TransactionOrigin,
100 tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
101 ) -> impl Future<Output = Result<B256, Self::Error>> + Send {
102 async move {
103 let (encoded, recovered) = tx.split();
104 let pool_transaction =
105 <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
106
107 self.send_pool_transaction(origin, WithEncoded::new(encoded, pool_transaction)).await
108 }
109 }
110
111 fn send_pool_transaction(
113 &self,
114 origin: TransactionOrigin,
115 tx: WithEncoded<PoolTx<Self::Pool>>,
116 ) -> impl Future<Output = Result<B256, Self::Error>> + Send;
117
118 fn send_raw_transaction_sync(
122 &self,
123 tx: Bytes,
124 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
125 where
126 Self: LoadReceipt + 'static,
127 {
128 let this = self.clone();
129 let timeout_duration = self.send_raw_transaction_sync_timeout();
130 async move {
131 let mut stream = this.provider().canonical_state_stream();
132 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
133 tokio::time::timeout(timeout_duration, async {
134 while let Some(notification) = stream.next().await {
135 let chain = notification.committed();
136 if let Some((block, tx, receipt, all_receipts)) =
137 chain.find_transaction_and_receipt_by_hash(hash) &&
138 let Some(receipt) = convert_transaction_receipt(
139 block,
140 all_receipts,
141 tx,
142 receipt,
143 this.converter(),
144 )
145 .transpose()
146 .map_err(Self::Error::from)?
147 {
148 return Ok(receipt);
149 }
150 }
151 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
152 hash,
153 duration: timeout_duration,
154 }))
155 })
156 .await
157 .unwrap_or_else(|_elapsed| {
158 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
159 hash,
160 duration: timeout_duration,
161 }))
162 })
163 }
164 }
165
166 #[expect(clippy::complexity)]
172 fn transaction_by_hash(
173 &self,
174 hash: B256,
175 ) -> impl Future<
176 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
177 > + Send {
178 LoadTransaction::transaction_by_hash(self, hash)
179 }
180
181 fn pending_transactions(&self) -> Result<Vec<RpcTransaction<Self::NetworkTypes>>, Self::Error> {
183 self.pool()
184 .pending_transactions()
185 .into_iter()
186 .map(|tx| self.converter().fill_pending(tx.transaction.clone_into_consensus()))
187 .collect::<Result<Vec<_>, _>>()
188 .map_err(Self::Error::from)
189 }
190
191 #[expect(clippy::type_complexity)]
195 fn transactions_by_block(
196 &self,
197 block: B256,
198 ) -> impl Future<Output = Result<Option<Vec<ProviderTx<Self::Provider>>>, Self::Error>> + Send
199 {
200 async move {
201 self.cache()
202 .get_recovered_block(block)
203 .await
204 .map(|b| b.map(|b| b.body().transactions().to_vec()))
205 .map_err(Self::Error::from_eth_err)
206 }
207 }
208
209 fn raw_transaction_by_hash(
217 &self,
218 hash: B256,
219 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send {
220 async move {
221 if let Some(tx) =
223 self.pool().get_pooled_transaction_element(hash).map(|tx| tx.encoded_2718().into())
224 {
225 return Ok(Some(tx))
226 }
227
228 self.spawn_blocking_io(move |ref this| {
229 Ok(this
230 .provider()
231 .transaction_by_hash(hash)
232 .map_err(Self::Error::from_eth_err)?
233 .map(|tx| tx.encoded_2718().into()))
234 })
235 .await
236 }
237 }
238
239 #[expect(clippy::type_complexity)]
241 fn historical_transaction_by_hash_at(
242 &self,
243 hash: B256,
244 ) -> impl Future<
245 Output = Result<Option<(TransactionSource<ProviderTx<Self::Provider>>, B256)>, Self::Error>,
246 > + Send {
247 async move {
248 match self.transaction_by_hash_at(hash).await? {
249 None => Ok(None),
250 Some((tx, at)) => Ok(at.as_block_hash().map(|hash| (tx, hash))),
251 }
252 }
253 }
254
255 fn transaction_receipt(
260 &self,
261 hash: B256,
262 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
263 where
264 Self: LoadReceipt + 'static,
265 {
266 async move {
267 match self.load_transaction_and_receipt(hash).await? {
268 Some((tx, meta, receipt, all_receipts)) => {
269 self.build_transaction_receipt(tx, meta, receipt, all_receipts).await.map(Some)
270 }
271 None => Ok(None),
272 }
273 }
274 }
275
276 #[expect(clippy::complexity)]
280 fn load_transaction_and_receipt(
281 &self,
282 hash: TxHash,
283 ) -> impl Future<
284 Output = Result<
285 Option<(
286 Recovered<ProviderTx<Self::Provider>>,
287 TransactionMeta,
288 ProviderReceipt<Self::Provider>,
289 Option<Arc<Vec<ProviderReceipt<Self::Provider>>>>,
290 )>,
291 Self::Error,
292 >,
293 > + Send
294 where
295 Self: 'static,
296 {
297 async move {
298 if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
299 let Some(tx) = cached.recovered_transaction().map(|tx| tx.cloned())
300 {
301 let meta = cached.transaction_meta(hash);
302
303 if let Some(all_receipts) = cached.receipts.clone() &&
305 let Some(receipt) = all_receipts.get(cached.tx_index).cloned()
306 {
307 return Ok(Some((tx, meta, receipt, Some(all_receipts))));
308 }
309
310 if let Some(receipts) = self
314 .cache()
315 .get_receipts(cached.block.hash())
316 .await
317 .map_err(Self::Error::from_eth_err)? &&
318 let Some(receipt) = receipts.get(cached.tx_index).cloned()
319 {
320 return Ok(Some((tx, meta, receipt, Some(receipts))));
321 }
322 }
323
324 self.spawn_blocking_io(move |this| {
326 let provider = this.provider();
327 let Some((tx, meta)) = provider
328 .transaction_by_hash_with_meta(hash)
329 .map_err(Self::Error::from_eth_err)?
330 else {
331 return Ok(None);
332 };
333
334 let tx = tx.try_into_recovered_unchecked().map_err(Self::Error::from_eth_err)?;
335
336 let receipt = provider.receipt_by_hash(hash).map_err(Self::Error::from_eth_err)?;
337
338 Ok(receipt.map(|receipt| (tx, meta, receipt, None)))
339 })
340 .await
341 }
342 }
343
344 fn transaction_by_block_and_tx_index(
348 &self,
349 block_id: BlockId,
350 index: usize,
351 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
352 where
353 Self: LoadBlock,
354 {
355 async move {
356 if let Some(block) = self.recovered_block(block_id).await? {
357 let block_hash = block.hash();
358 let block_number = block.number();
359 let block_timestamp = block.timestamp();
360 let base_fee_per_gas = block.base_fee_per_gas();
361 if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
362 let tx_info = TransactionInfo {
363 hash: Some(*tx.tx_hash()),
364 block_hash: Some(block_hash),
365 block_number: Some(block_number),
366 block_timestamp: Some(block_timestamp),
367 base_fee: base_fee_per_gas,
368 index: Some(index as u64),
369 };
370
371 return Ok(Some(
372 self.converter().fill(tx.clone().with_signer(*signer), tx_info)?,
373 ))
374 }
375 }
376
377 Ok(None)
378 }
379 }
380
381 fn get_transaction_by_sender_and_nonce(
383 &self,
384 sender: Address,
385 nonce: u64,
386 include_pending: bool,
387 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
388 where
389 Self: LoadBlock + LoadState,
390 {
391 async move {
392 if include_pending &&
394 let Some(tx) =
395 RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
396 {
397 let transaction = tx.transaction.clone_into_consensus();
398 return Ok(Some(self.converter().fill_pending(transaction)?));
399 }
400
401 let highest = self.transaction_count(sender, None).await?.saturating_to::<u64>();
405
406 if nonce >= highest {
409 return Ok(None);
410 }
411
412 let high = self.provider().best_block_number().map_err(Self::Error::from_eth_err)?;
413
414 let num = binary_search::<_, _, Self::Error>(1, high, |mid| async move {
417 let mid_nonce =
418 self.transaction_count(sender, Some(mid.into())).await?.saturating_to::<u64>();
419
420 Ok(mid_nonce > nonce)
421 })
422 .await?;
423
424 let block_id = num.into();
425 self.recovered_block(block_id)
426 .await?
427 .and_then(|block| {
428 let block_hash = block.hash();
429 let block_number = block.number();
430 let block_timestamp = block.timestamp();
431 let base_fee_per_gas = block.base_fee_per_gas();
432
433 block
434 .transactions_with_sender()
435 .enumerate()
436 .find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
437 .map(|(index, (signer, tx))| {
438 let tx_info = TransactionInfo {
439 hash: Some(*tx.tx_hash()),
440 block_hash: Some(block_hash),
441 block_number: Some(block_number),
442 block_timestamp: Some(block_timestamp),
443 base_fee: base_fee_per_gas,
444 index: Some(index as u64),
445 };
446 Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
447 })
448 })
449 .ok_or(EthApiError::HeaderNotFound(block_id))?
450 .map(Some)
451 }
452 }
453
454 fn raw_transaction_by_block_and_tx_index(
458 &self,
459 block_id: BlockId,
460 index: usize,
461 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send
462 where
463 Self: LoadBlock,
464 {
465 async move {
466 if let Some(block) = self.recovered_block(block_id).await? &&
467 let Some(tx) = block.body().transactions().get(index)
468 {
469 return Ok(Some(tx.encoded_2718().into()))
470 }
471
472 Ok(None)
473 }
474 }
475
476 fn send_transaction_request(
479 &self,
480 mut request: RpcTxReq<Self::NetworkTypes>,
481 ) -> impl Future<Output = Result<B256, Self::Error>> + Send
482 where
483 Self: EthApiSpec + LoadBlock + EstimateCall,
484 {
485 async move {
486 let from = match request.as_ref().from() {
487 Some(from) => from,
488 None => return Err(SignError::NoAccount.into_eth_err()),
489 };
490
491 if self.find_signer(&from).is_err() {
492 return Err(SignError::NoAccount.into_eth_err())
493 }
494
495 if request.as_ref().nonce().is_none() {
497 let nonce = self.next_available_nonce_for(&request).await?;
498 request.as_mut().set_nonce(nonce);
499 }
500
501 let chain_id = self.chain_id();
502 request.as_mut().set_chain_id(chain_id.to());
503
504 let estimated_gas = self
505 .estimate_gas_at(request.clone(), BlockId::pending(), EvmOverrides::default())
506 .await?;
507 let gas_limit = estimated_gas;
508 request.as_mut().set_gas_limit(gas_limit.to());
509
510 let transaction = self.sign_request(&from, request).await?.with_signer(from);
511
512 let pool_transaction =
513 <<Self as RpcNodeCore>::Pool as TransactionPool>::Transaction::try_from_consensus(
514 transaction,
515 )
516 .map_err(|e| {
517 Self::Error::from_eth_err(TransactionConversionError::Other(e.to_string()))
518 })?;
519
520 let AddedTransactionOutcome { hash, .. } = self
522 .pool()
523 .add_transaction(TransactionOrigin::Local, pool_transaction)
524 .await
525 .map_err(Self::Error::from_eth_err)?;
526
527 Ok(hash)
528 }
529 }
530
531 fn fill_transaction(
533 &self,
534 mut request: RpcTxReq<Self::NetworkTypes>,
535 ) -> impl Future<Output = Result<FillTransaction<TxTy<Self::Primitives>>, Self::Error>> + Send
536 where
537 Self: EthApiSpec + LoadBlock + EstimateCall + LoadFee,
538 {
539 async move {
540 if request.as_ref().value().is_none() {
541 request.as_mut().set_value(U256::ZERO);
542 }
543
544 if request.as_ref().nonce().is_none() {
545 let nonce = self.next_available_nonce_for(&request).await?;
546 request.as_mut().set_nonce(nonce);
547 }
548
549 let chain_id = self.chain_id();
550 request.as_mut().set_chain_id(chain_id.to());
551
552 if request.as_ref().has_eip4844_fields() &&
553 request.as_ref().max_fee_per_blob_gas().is_none()
554 {
555 let blob_fee = self.blob_base_fee().await?;
556 request.as_mut().set_max_fee_per_blob_gas(blob_fee.to());
557 }
558
559 if request.as_ref().sidecar.is_some() &&
562 request.as_ref().blob_versioned_hashes.is_none()
563 {
564 request.as_mut().populate_blob_hashes();
565 }
566
567 if request.as_ref().gas_limit().is_none() {
568 let estimated_gas = self
569 .estimate_gas_at(request.clone(), BlockId::pending(), EvmOverrides::default())
570 .await?;
571 request.as_mut().set_gas_limit(estimated_gas.to());
572 }
573
574 if request.as_ref().gas_price().is_none() {
575 let tip = if let Some(tip) = request.as_ref().max_priority_fee_per_gas() {
576 tip
577 } else {
578 let tip = self.suggested_priority_fee().await?.to::<u128>();
579 request.as_mut().set_max_priority_fee_per_gas(tip);
580 tip
581 };
582 if request.as_ref().max_fee_per_gas().is_none() {
583 let header =
584 self.provider().latest_header().map_err(Self::Error::from_eth_err)?;
585 let base_fee = header.and_then(|h| h.base_fee_per_gas()).unwrap_or_default();
586 request.as_mut().set_max_fee_per_gas(base_fee as u128 * 2 + tip);
593 }
594 }
595
596 let tx = self.converter().build_simulate_v1_transaction(request)?;
597
598 let raw = tx.encoded_2718().into();
599
600 Ok(FillTransaction { raw, tx })
601 }
602 }
603
604 fn sign_request(
606 &self,
607 from: &Address,
608 txn: RpcTxReq<Self::NetworkTypes>,
609 ) -> impl Future<Output = Result<ProviderTx<Self::Provider>, Self::Error>> + Send {
610 async move {
611 self.find_signer(from)?
612 .sign_transaction(txn, from)
613 .await
614 .map_err(Self::Error::from_eth_err)
615 }
616 }
617
618 fn sign(
620 &self,
621 account: Address,
622 message: Bytes,
623 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
624 async move {
625 Ok(self
626 .find_signer(&account)?
627 .sign(account, &message)
628 .await
629 .map_err(Self::Error::from_eth_err)?
630 .as_bytes()
631 .into())
632 }
633 }
634
635 fn sign_transaction(
638 &self,
639 request: RpcTxReq<Self::NetworkTypes>,
640 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
641 async move {
642 let from = match request.as_ref().from() {
643 Some(from) => from,
644 None => return Err(SignError::NoAccount.into_eth_err()),
645 };
646
647 Ok(self.sign_request(&from, request).await?.encoded_2718().into())
648 }
649 }
650
651 fn sign_typed_data(&self, data: &TypedData, account: Address) -> Result<Bytes, Self::Error> {
653 Ok(self
654 .find_signer(&account)?
655 .sign_typed_data(account, data)
656 .map_err(Self::Error::from_eth_err)?
657 .as_bytes()
658 .into())
659 }
660
661 #[expect(clippy::type_complexity)]
663 fn find_signer(
664 &self,
665 account: &Address,
666 ) -> Result<
667 Box<dyn EthSigner<ProviderTx<Self::Provider>, RpcTxReq<Self::NetworkTypes>> + 'static>,
668 Self::Error,
669 > {
670 self.signers()
671 .read()
672 .iter()
673 .find(|signer| signer.is_signer_for(account))
674 .map(|signer| dyn_clone::clone_box(&**signer))
675 .ok_or_else(|| SignError::NoAccount.into_eth_err())
676 }
677}
678
679pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
684 #[expect(clippy::complexity)]
690 fn transaction_by_hash(
691 &self,
692 hash: B256,
693 ) -> impl Future<
694 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
695 > + Send {
696 async move {
697 if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
699 let Some(source) = cached.to_transaction_source()
700 {
701 return Ok(Some(source));
702 }
703
704 if let Some((tx, meta)) = self
706 .spawn_blocking_io(move |this| {
707 this.provider()
708 .transaction_by_hash_with_meta(hash)
709 .map_err(Self::Error::from_eth_err)
710 })
711 .await?
712 {
713 let transaction = tx
717 .try_into_recovered_unchecked()
718 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
719
720 return Ok(Some(TransactionSource::Block {
721 transaction,
722 index: meta.index,
723 block_hash: meta.block_hash,
724 block_number: meta.block_number,
725 block_timestamp: meta.timestamp,
726 base_fee: meta.base_fee,
727 }));
728 }
729
730 if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus())
732 {
733 return Ok(Some(TransactionSource::Pool(tx.into())));
734 }
735
736 Ok(None)
737 }
738 }
739
740 #[expect(clippy::type_complexity)]
744 fn transaction_by_hash_at(
745 &self,
746 transaction_hash: B256,
747 ) -> impl Future<
748 Output = Result<
749 Option<(TransactionSource<ProviderTx<Self::Provider>>, BlockId)>,
750 Self::Error,
751 >,
752 > + Send {
753 async move {
754 Ok(self.transaction_by_hash(transaction_hash).await?.map(|tx| match tx {
755 tx @ TransactionSource::Pool(_) => (tx, BlockId::pending()),
756 tx @ TransactionSource::Block { block_hash, .. } => {
757 (tx, BlockId::Hash(block_hash.into()))
758 }
759 }))
760 }
761 }
762
763 #[expect(clippy::type_complexity)]
765 fn transaction_and_block(
766 &self,
767 hash: B256,
768 ) -> impl Future<
769 Output = Result<
770 Option<(
771 TransactionSource<ProviderTx<Self::Provider>>,
772 Arc<RecoveredBlock<ProviderBlock<Self::Provider>>>,
773 )>,
774 Self::Error,
775 >,
776 > + Send {
777 async move {
778 let (transaction, at) = match self.transaction_by_hash_at(hash).await? {
779 None => return Ok(None),
780 Some(res) => res,
781 };
782
783 let block_hash = match at {
785 BlockId::Hash(hash) => hash.block_hash,
786 _ => return Ok(None),
787 };
788 let block = self
789 .cache()
790 .get_recovered_block(block_hash)
791 .await
792 .map_err(Self::Error::from_eth_err)?;
793 Ok(block.map(|block| (transaction, block)))
794 }
795 }
796}