1use super::{EthApiSpec, EthSigner, LoadBlock, LoadFee, LoadReceipt, LoadState, SpawnBlocking};
5use crate::{
6 helpers::{estimate::EstimateCall, spec::SignersForRpc},
7 FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
8 RpcTransaction,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
12 BlockHeader, Transaction,
13};
14use alloy_dyn_abi::TypedData;
15use alloy_eips::{eip2718::Encodable2718, BlockId};
16use alloy_network::{TransactionBuilder, TransactionBuilder4844};
17use alloy_primitives::{Address, Bytes, TxHash, B256, U256};
18use alloy_rpc_types_eth::{state::EvmOverrides, TransactionInfo};
19use futures::{Future, StreamExt};
20use reth_chain_state::CanonStateSubscriptions;
21use reth_primitives_traits::{
22 BlockBody, Recovered, RecoveredBlock, SignedTransaction, TxTy, WithEncoded,
23};
24use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq, TransactionConversionError};
25use reth_rpc_eth_types::{
26 block::convert_transaction_receipt,
27 utils::{binary_search, recover_raw_transaction},
28 EthApiError::{self, TransactionConfirmationTimeout},
29 FillTransaction, SignError, TransactionSource,
30};
31use reth_storage_api::{
32 BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
33 TransactionsProvider,
34};
35use reth_transaction_pool::{
36 AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
37};
38use std::{sync::Arc, time::Duration};
39
40pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
63 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes>;
67
68 fn accounts(&self) -> Vec<Address> {
70 self.signers().read().iter().flat_map(|s| s.accounts()).collect()
71 }
72
73 fn send_raw_transaction_sync_timeout(&self) -> Duration;
75
76 fn send_raw_transaction(
80 &self,
81 tx: Bytes,
82 ) -> impl Future<Output = Result<B256, Self::Error>> + Send {
83 async move {
84 let recovered = recover_raw_transaction::<PoolPooledTx<Self::Pool>>(&tx)?;
85 self.send_transaction(TransactionOrigin::External, WithEncoded::new(tx, recovered))
86 .await
87 }
88 }
89
90 fn send_transaction(
92 &self,
93 origin: TransactionOrigin,
94 tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
95 ) -> impl Future<Output = Result<B256, Self::Error>> + Send;
96
97 fn send_raw_transaction_sync(
101 &self,
102 tx: Bytes,
103 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
104 where
105 Self: LoadReceipt + 'static,
106 {
107 let this = self.clone();
108 let timeout_duration = self.send_raw_transaction_sync_timeout();
109 async move {
110 let mut stream = this.provider().canonical_state_stream();
111 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
112 tokio::time::timeout(timeout_duration, async {
113 while let Some(notification) = stream.next().await {
114 let chain = notification.committed();
115 if let Some((block, tx, receipt, all_receipts)) =
116 chain.find_transaction_and_receipt_by_hash(hash) &&
117 let Some(receipt) = convert_transaction_receipt(
118 block,
119 all_receipts,
120 tx,
121 receipt,
122 this.converter(),
123 )
124 .transpose()
125 .map_err(Self::Error::from)?
126 {
127 return Ok(receipt);
128 }
129 }
130 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
131 hash,
132 duration: timeout_duration,
133 }))
134 })
135 .await
136 .unwrap_or_else(|_elapsed| {
137 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
138 hash,
139 duration: timeout_duration,
140 }))
141 })
142 }
143 }
144
145 #[expect(clippy::complexity)]
151 fn transaction_by_hash(
152 &self,
153 hash: B256,
154 ) -> impl Future<
155 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
156 > + Send {
157 LoadTransaction::transaction_by_hash(self, hash)
158 }
159
160 fn pending_transactions(&self) -> Result<Vec<RpcTransaction<Self::NetworkTypes>>, Self::Error> {
162 self.pool()
163 .pending_transactions()
164 .into_iter()
165 .map(|tx| self.converter().fill_pending(tx.transaction.clone_into_consensus()))
166 .collect::<Result<Vec<_>, _>>()
167 .map_err(Self::Error::from)
168 }
169
170 #[expect(clippy::type_complexity)]
174 fn transactions_by_block(
175 &self,
176 block: B256,
177 ) -> impl Future<Output = Result<Option<Vec<ProviderTx<Self::Provider>>>, Self::Error>> + Send
178 {
179 async move {
180 self.cache()
181 .get_recovered_block(block)
182 .await
183 .map(|b| b.map(|b| b.body().transactions().to_vec()))
184 .map_err(Self::Error::from_eth_err)
185 }
186 }
187
188 fn raw_transaction_by_hash(
196 &self,
197 hash: B256,
198 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send {
199 async move {
200 if let Some(tx) =
202 self.pool().get_pooled_transaction_element(hash).map(|tx| tx.encoded_2718().into())
203 {
204 return Ok(Some(tx))
205 }
206
207 self.spawn_blocking_io(move |ref this| {
208 Ok(this
209 .provider()
210 .transaction_by_hash(hash)
211 .map_err(Self::Error::from_eth_err)?
212 .map(|tx| tx.encoded_2718().into()))
213 })
214 .await
215 }
216 }
217
218 #[expect(clippy::type_complexity)]
220 fn historical_transaction_by_hash_at(
221 &self,
222 hash: B256,
223 ) -> impl Future<
224 Output = Result<Option<(TransactionSource<ProviderTx<Self::Provider>>, B256)>, Self::Error>,
225 > + Send {
226 async move {
227 match self.transaction_by_hash_at(hash).await? {
228 None => Ok(None),
229 Some((tx, at)) => Ok(at.as_block_hash().map(|hash| (tx, hash))),
230 }
231 }
232 }
233
234 fn transaction_receipt(
239 &self,
240 hash: B256,
241 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
242 where
243 Self: LoadReceipt + 'static,
244 {
245 async move {
246 match self.load_transaction_and_receipt(hash).await? {
247 Some((tx, meta, receipt, all_receipts)) => {
248 self.build_transaction_receipt(tx, meta, receipt, all_receipts).await.map(Some)
249 }
250 None => Ok(None),
251 }
252 }
253 }
254
255 #[expect(clippy::complexity)]
259 fn load_transaction_and_receipt(
260 &self,
261 hash: TxHash,
262 ) -> impl Future<
263 Output = Result<
264 Option<(
265 Recovered<ProviderTx<Self::Provider>>,
266 TransactionMeta,
267 ProviderReceipt<Self::Provider>,
268 Option<Arc<Vec<ProviderReceipt<Self::Provider>>>>,
269 )>,
270 Self::Error,
271 >,
272 > + Send
273 where
274 Self: 'static,
275 {
276 async move {
277 if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
278 let Some(tx) = cached.recovered_transaction().map(|tx| tx.cloned())
279 {
280 let meta = cached.transaction_meta(hash);
281
282 if let Some(all_receipts) = cached.receipts.clone() &&
284 let Some(receipt) = all_receipts.get(cached.tx_index).cloned()
285 {
286 return Ok(Some((tx, meta, receipt, Some(all_receipts))));
287 }
288
289 if let Some(receipts) = self
293 .cache()
294 .get_receipts(cached.block.hash())
295 .await
296 .map_err(Self::Error::from_eth_err)? &&
297 let Some(receipt) = receipts.get(cached.tx_index).cloned()
298 {
299 return Ok(Some((tx, meta, receipt, Some(receipts))));
300 }
301 }
302
303 self.spawn_blocking_io(move |this| {
305 let provider = this.provider();
306 let Some((tx, meta)) = provider
307 .transaction_by_hash_with_meta(hash)
308 .map_err(Self::Error::from_eth_err)?
309 else {
310 return Ok(None);
311 };
312
313 let tx = tx.try_into_recovered_unchecked().map_err(Self::Error::from_eth_err)?;
314
315 let receipt = provider.receipt_by_hash(hash).map_err(Self::Error::from_eth_err)?;
316
317 Ok(receipt.map(|receipt| (tx, meta, receipt, None)))
318 })
319 .await
320 }
321 }
322
323 fn transaction_by_block_and_tx_index(
327 &self,
328 block_id: BlockId,
329 index: usize,
330 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
331 where
332 Self: LoadBlock,
333 {
334 async move {
335 if let Some(block) = self.recovered_block(block_id).await? {
336 let block_hash = block.hash();
337 let block_number = block.number();
338 let block_timestamp = block.timestamp();
339 let base_fee_per_gas = block.base_fee_per_gas();
340 if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
341 let tx_info = TransactionInfo {
342 hash: Some(*tx.tx_hash()),
343 block_hash: Some(block_hash),
344 block_number: Some(block_number),
345 block_timestamp: Some(block_timestamp),
346 base_fee: base_fee_per_gas,
347 index: Some(index as u64),
348 };
349
350 return Ok(Some(
351 self.converter().fill(tx.clone().with_signer(*signer), tx_info)?,
352 ))
353 }
354 }
355
356 Ok(None)
357 }
358 }
359
360 fn get_transaction_by_sender_and_nonce(
362 &self,
363 sender: Address,
364 nonce: u64,
365 include_pending: bool,
366 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
367 where
368 Self: LoadBlock + LoadState,
369 {
370 async move {
371 if include_pending &&
373 let Some(tx) =
374 RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
375 {
376 let transaction = tx.transaction.clone_into_consensus();
377 return Ok(Some(self.converter().fill_pending(transaction)?));
378 }
379
380 let highest = self.transaction_count(sender, None).await?.saturating_to::<u64>();
384
385 if nonce >= highest {
388 return Ok(None);
389 }
390
391 let high = self.provider().best_block_number().map_err(Self::Error::from_eth_err)?;
392
393 let num = binary_search::<_, _, Self::Error>(1, high, |mid| async move {
396 let mid_nonce =
397 self.transaction_count(sender, Some(mid.into())).await?.saturating_to::<u64>();
398
399 Ok(mid_nonce > nonce)
400 })
401 .await?;
402
403 let block_id = num.into();
404 self.recovered_block(block_id)
405 .await?
406 .and_then(|block| {
407 let block_hash = block.hash();
408 let block_number = block.number();
409 let block_timestamp = block.timestamp();
410 let base_fee_per_gas = block.base_fee_per_gas();
411
412 block
413 .transactions_with_sender()
414 .enumerate()
415 .find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
416 .map(|(index, (signer, tx))| {
417 let tx_info = TransactionInfo {
418 hash: Some(*tx.tx_hash()),
419 block_hash: Some(block_hash),
420 block_number: Some(block_number),
421 block_timestamp: Some(block_timestamp),
422 base_fee: base_fee_per_gas,
423 index: Some(index as u64),
424 };
425 Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
426 })
427 })
428 .ok_or(EthApiError::HeaderNotFound(block_id))?
429 .map(Some)
430 }
431 }
432
433 fn raw_transaction_by_block_and_tx_index(
437 &self,
438 block_id: BlockId,
439 index: usize,
440 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send
441 where
442 Self: LoadBlock,
443 {
444 async move {
445 if let Some(block) = self.recovered_block(block_id).await? &&
446 let Some(tx) = block.body().transactions().get(index)
447 {
448 return Ok(Some(tx.encoded_2718().into()))
449 }
450
451 Ok(None)
452 }
453 }
454
455 fn send_transaction_request(
458 &self,
459 mut request: RpcTxReq<Self::NetworkTypes>,
460 ) -> impl Future<Output = Result<B256, Self::Error>> + Send
461 where
462 Self: EthApiSpec + LoadBlock + EstimateCall,
463 {
464 async move {
465 let from = match request.as_ref().from() {
466 Some(from) => from,
467 None => return Err(SignError::NoAccount.into_eth_err()),
468 };
469
470 if self.find_signer(&from).is_err() {
471 return Err(SignError::NoAccount.into_eth_err())
472 }
473
474 if request.as_ref().nonce().is_none() {
476 let nonce = self.next_available_nonce_for(&request).await?;
477 request.as_mut().set_nonce(nonce);
478 }
479
480 let chain_id = self.chain_id();
481 request.as_mut().set_chain_id(chain_id.to());
482
483 let estimated_gas = self
484 .estimate_gas_at(request.clone(), BlockId::pending(), EvmOverrides::default())
485 .await?;
486 let gas_limit = estimated_gas;
487 request.as_mut().set_gas_limit(gas_limit.to());
488
489 let transaction = self.sign_request(&from, request).await?.with_signer(from);
490
491 let pool_transaction =
492 <<Self as RpcNodeCore>::Pool as TransactionPool>::Transaction::try_from_consensus(
493 transaction,
494 )
495 .map_err(|e| {
496 Self::Error::from_eth_err(TransactionConversionError::Other(e.to_string()))
497 })?;
498
499 let AddedTransactionOutcome { hash, .. } = self
501 .pool()
502 .add_transaction(TransactionOrigin::Local, pool_transaction)
503 .await
504 .map_err(Self::Error::from_eth_err)?;
505
506 Ok(hash)
507 }
508 }
509
510 fn fill_transaction(
512 &self,
513 mut request: RpcTxReq<Self::NetworkTypes>,
514 ) -> impl Future<Output = Result<FillTransaction<TxTy<Self::Primitives>>, Self::Error>> + Send
515 where
516 Self: EthApiSpec + LoadBlock + EstimateCall + LoadFee,
517 {
518 async move {
519 if request.as_ref().value().is_none() {
520 request.as_mut().set_value(U256::ZERO);
521 }
522
523 if request.as_ref().nonce().is_none() {
524 let nonce = self.next_available_nonce_for(&request).await?;
525 request.as_mut().set_nonce(nonce);
526 }
527
528 let chain_id = self.chain_id();
529 request.as_mut().set_chain_id(chain_id.to());
530
531 if request.as_ref().has_eip4844_fields() &&
532 request.as_ref().max_fee_per_blob_gas().is_none()
533 {
534 let blob_fee = self.blob_base_fee().await?;
535 request.as_mut().set_max_fee_per_blob_gas(blob_fee.to());
536 }
537
538 if request.as_ref().sidecar.is_some() &&
541 request.as_ref().blob_versioned_hashes.is_none()
542 {
543 request.as_mut().populate_blob_hashes();
544 }
545
546 if request.as_ref().gas_limit().is_none() {
547 let estimated_gas = self
548 .estimate_gas_at(request.clone(), BlockId::pending(), EvmOverrides::default())
549 .await?;
550 request.as_mut().set_gas_limit(estimated_gas.to());
551 }
552
553 if request.as_ref().gas_price().is_none() {
554 let tip = if let Some(tip) = request.as_ref().max_priority_fee_per_gas() {
555 tip
556 } else {
557 let tip = self.suggested_priority_fee().await?.to::<u128>();
558 request.as_mut().set_max_priority_fee_per_gas(tip);
559 tip
560 };
561 if request.as_ref().max_fee_per_gas().is_none() {
562 let header =
563 self.provider().latest_header().map_err(Self::Error::from_eth_err)?;
564 let base_fee = header.and_then(|h| h.base_fee_per_gas()).unwrap_or_default();
565 request.as_mut().set_max_fee_per_gas(base_fee as u128 + tip);
566 }
567 }
568
569 let tx = self.converter().build_simulate_v1_transaction(request)?;
570
571 let raw = tx.encoded_2718().into();
572
573 Ok(FillTransaction { raw, tx })
574 }
575 }
576
577 fn sign_request(
579 &self,
580 from: &Address,
581 txn: RpcTxReq<Self::NetworkTypes>,
582 ) -> impl Future<Output = Result<ProviderTx<Self::Provider>, Self::Error>> + Send {
583 async move {
584 self.find_signer(from)?
585 .sign_transaction(txn, from)
586 .await
587 .map_err(Self::Error::from_eth_err)
588 }
589 }
590
591 fn sign(
593 &self,
594 account: Address,
595 message: Bytes,
596 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
597 async move {
598 Ok(self
599 .find_signer(&account)?
600 .sign(account, &message)
601 .await
602 .map_err(Self::Error::from_eth_err)?
603 .as_bytes()
604 .into())
605 }
606 }
607
608 fn sign_transaction(
611 &self,
612 request: RpcTxReq<Self::NetworkTypes>,
613 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
614 async move {
615 let from = match request.as_ref().from() {
616 Some(from) => from,
617 None => return Err(SignError::NoAccount.into_eth_err()),
618 };
619
620 Ok(self.sign_request(&from, request).await?.encoded_2718().into())
621 }
622 }
623
624 fn sign_typed_data(&self, data: &TypedData, account: Address) -> Result<Bytes, Self::Error> {
626 Ok(self
627 .find_signer(&account)?
628 .sign_typed_data(account, data)
629 .map_err(Self::Error::from_eth_err)?
630 .as_bytes()
631 .into())
632 }
633
634 #[expect(clippy::type_complexity)]
636 fn find_signer(
637 &self,
638 account: &Address,
639 ) -> Result<
640 Box<dyn EthSigner<ProviderTx<Self::Provider>, RpcTxReq<Self::NetworkTypes>> + 'static>,
641 Self::Error,
642 > {
643 self.signers()
644 .read()
645 .iter()
646 .find(|signer| signer.is_signer_for(account))
647 .map(|signer| dyn_clone::clone_box(&**signer))
648 .ok_or_else(|| SignError::NoAccount.into_eth_err())
649 }
650}
651
652pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
657 #[expect(clippy::complexity)]
663 fn transaction_by_hash(
664 &self,
665 hash: B256,
666 ) -> impl Future<
667 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
668 > + Send {
669 async move {
670 if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
672 let Some(source) = cached.to_transaction_source()
673 {
674 return Ok(Some(source));
675 }
676
677 if let Some((tx, meta)) = self
679 .spawn_blocking_io(move |this| {
680 this.provider()
681 .transaction_by_hash_with_meta(hash)
682 .map_err(Self::Error::from_eth_err)
683 })
684 .await?
685 {
686 let transaction = tx
690 .try_into_recovered_unchecked()
691 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
692
693 return Ok(Some(TransactionSource::Block {
694 transaction,
695 index: meta.index,
696 block_hash: meta.block_hash,
697 block_number: meta.block_number,
698 block_timestamp: meta.timestamp,
699 base_fee: meta.base_fee,
700 }));
701 }
702
703 if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus())
705 {
706 return Ok(Some(TransactionSource::Pool(tx.into())));
707 }
708
709 Ok(None)
710 }
711 }
712
713 #[expect(clippy::type_complexity)]
717 fn transaction_by_hash_at(
718 &self,
719 transaction_hash: B256,
720 ) -> impl Future<
721 Output = Result<
722 Option<(TransactionSource<ProviderTx<Self::Provider>>, BlockId)>,
723 Self::Error,
724 >,
725 > + Send {
726 async move {
727 Ok(self.transaction_by_hash(transaction_hash).await?.map(|tx| match tx {
728 tx @ TransactionSource::Pool(_) => (tx, BlockId::pending()),
729 tx @ TransactionSource::Block { block_hash, .. } => {
730 (tx, BlockId::Hash(block_hash.into()))
731 }
732 }))
733 }
734 }
735
736 #[expect(clippy::type_complexity)]
738 fn transaction_and_block(
739 &self,
740 hash: B256,
741 ) -> impl Future<
742 Output = Result<
743 Option<(
744 TransactionSource<ProviderTx<Self::Provider>>,
745 Arc<RecoveredBlock<ProviderBlock<Self::Provider>>>,
746 )>,
747 Self::Error,
748 >,
749 > + Send {
750 async move {
751 let (transaction, at) = match self.transaction_by_hash_at(hash).await? {
752 None => return Ok(None),
753 Some(res) => res,
754 };
755
756 let block_hash = match at {
758 BlockId::Hash(hash) => hash.block_hash,
759 _ => return Ok(None),
760 };
761 let block = self
762 .cache()
763 .get_recovered_block(block_hash)
764 .await
765 .map_err(Self::Error::from_eth_err)?;
766 Ok(block.map(|block| (transaction, block)))
767 }
768 }
769}