reth_rpc_eth_api/helpers/
transaction.rs1use super::{EthApiSpec, EthSigner, LoadBlock, LoadFee, LoadReceipt, LoadState, SpawnBlocking};
5use crate::{
6 helpers::{estimate::EstimateCall, spec::SignersForRpc},
7 FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
8 RpcTransaction,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
12 BlockHeader, Transaction,
13};
14use alloy_dyn_abi::TypedData;
15use alloy_eips::{eip2718::Encodable2718, BlockId};
16use alloy_network::{TransactionBuilder, TransactionBuilder4844};
17use alloy_primitives::{Address, Bytes, TxHash, B256, U256};
18use alloy_rpc_types_eth::{BlockNumberOrTag, TransactionInfo};
19use futures::{Future, StreamExt};
20use reth_chain_state::CanonStateSubscriptions;
21use reth_node_api::BlockBody;
22use reth_primitives_traits::{RecoveredBlock, SignedTransaction, TxTy};
23use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq};
24use reth_rpc_eth_types::{
25 utils::binary_search, EthApiError, EthApiError::TransactionConfirmationTimeout,
26 FillTransactionResult, SignError, TransactionSource,
27};
28use reth_storage_api::{
29 BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
30 TransactionsProvider,
31};
32use reth_transaction_pool::{
33 AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
34};
35use std::{sync::Arc, time::Duration};
36
37pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
60 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes>;
64
65 fn accounts(&self) -> Vec<Address> {
67 self.signers().read().iter().flat_map(|s| s.accounts()).collect()
68 }
69
70 fn send_raw_transaction_sync_timeout(&self) -> Duration;
72
73 fn send_raw_transaction(
77 &self,
78 tx: Bytes,
79 ) -> impl Future<Output = Result<B256, Self::Error>> + Send;
80
81 fn send_raw_transaction_sync(
85 &self,
86 tx: Bytes,
87 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
88 where
89 Self: LoadReceipt + 'static,
90 {
91 let this = self.clone();
92 let timeout_duration = self.send_raw_transaction_sync_timeout();
93 async move {
94 let mut stream = this.provider().canonical_state_stream();
95 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
96 tokio::time::timeout(timeout_duration, async {
97 while let Some(notification) = stream.next().await {
98 let chain = notification.committed();
99 for block in chain.blocks_iter() {
100 if block.body().contains_transaction(&hash) &&
101 let Some(receipt) = this.transaction_receipt(hash).await?
102 {
103 return Ok(receipt);
104 }
105 }
106 }
107 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
108 hash,
109 duration: timeout_duration,
110 }))
111 })
112 .await
113 .unwrap_or_else(|_elapsed| {
114 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
115 hash,
116 duration: timeout_duration,
117 }))
118 })
119 }
120 }
121
122 #[expect(clippy::complexity)]
128 fn transaction_by_hash(
129 &self,
130 hash: B256,
131 ) -> impl Future<
132 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
133 > + Send {
134 LoadTransaction::transaction_by_hash(self, hash)
135 }
136
137 #[expect(clippy::type_complexity)]
141 fn transactions_by_block(
142 &self,
143 block: B256,
144 ) -> impl Future<Output = Result<Option<Vec<ProviderTx<Self::Provider>>>, Self::Error>> + Send
145 {
146 async move {
147 self.cache()
148 .get_recovered_block(block)
149 .await
150 .map(|b| b.map(|b| b.body().transactions().to_vec()))
151 .map_err(Self::Error::from_eth_err)
152 }
153 }
154
155 fn raw_transaction_by_hash(
163 &self,
164 hash: B256,
165 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send {
166 async move {
167 if let Some(tx) =
169 self.pool().get_pooled_transaction_element(hash).map(|tx| tx.encoded_2718().into())
170 {
171 return Ok(Some(tx))
172 }
173
174 self.spawn_blocking_io(move |ref this| {
175 Ok(this
176 .provider()
177 .transaction_by_hash(hash)
178 .map_err(Self::Error::from_eth_err)?
179 .map(|tx| tx.encoded_2718().into()))
180 })
181 .await
182 }
183 }
184
185 #[expect(clippy::type_complexity)]
187 fn historical_transaction_by_hash_at(
188 &self,
189 hash: B256,
190 ) -> impl Future<
191 Output = Result<Option<(TransactionSource<ProviderTx<Self::Provider>>, B256)>, Self::Error>,
192 > + Send {
193 async move {
194 match self.transaction_by_hash_at(hash).await? {
195 None => Ok(None),
196 Some((tx, at)) => Ok(at.as_block_hash().map(|hash| (tx, hash))),
197 }
198 }
199 }
200
201 fn transaction_receipt(
206 &self,
207 hash: B256,
208 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
209 where
210 Self: LoadReceipt + 'static,
211 {
212 async move {
213 match self.load_transaction_and_receipt(hash).await? {
214 Some((tx, meta, receipt)) => {
215 self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
216 }
217 None => Ok(None),
218 }
219 }
220 }
221
222 #[expect(clippy::complexity)]
224 fn load_transaction_and_receipt(
225 &self,
226 hash: TxHash,
227 ) -> impl Future<
228 Output = Result<
229 Option<(ProviderTx<Self::Provider>, TransactionMeta, ProviderReceipt<Self::Provider>)>,
230 Self::Error,
231 >,
232 > + Send
233 where
234 Self: 'static,
235 {
236 self.spawn_blocking_io(move |this| {
237 let provider = this.provider();
238 let (tx, meta) = match provider
239 .transaction_by_hash_with_meta(hash)
240 .map_err(Self::Error::from_eth_err)?
241 {
242 Some((tx, meta)) => (tx, meta),
243 None => return Ok(None),
244 };
245
246 let receipt = match provider.receipt_by_hash(hash).map_err(Self::Error::from_eth_err)? {
247 Some(recpt) => recpt,
248 None => return Ok(None),
249 };
250
251 Ok(Some((tx, meta, receipt)))
252 })
253 }
254
255 fn transaction_by_block_and_tx_index(
259 &self,
260 block_id: BlockId,
261 index: usize,
262 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
263 where
264 Self: LoadBlock,
265 {
266 async move {
267 if let Some(block) = self.recovered_block(block_id).await? {
268 let block_hash = block.hash();
269 let block_number = block.number();
270 let base_fee_per_gas = block.base_fee_per_gas();
271 if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
272 let tx_info = TransactionInfo {
273 hash: Some(*tx.tx_hash()),
274 block_hash: Some(block_hash),
275 block_number: Some(block_number),
276 base_fee: base_fee_per_gas,
277 index: Some(index as u64),
278 };
279
280 return Ok(Some(
281 self.tx_resp_builder().fill(tx.clone().with_signer(*signer), tx_info)?,
282 ))
283 }
284 }
285
286 Ok(None)
287 }
288 }
289
290 fn get_transaction_by_sender_and_nonce(
292 &self,
293 sender: Address,
294 nonce: u64,
295 include_pending: bool,
296 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
297 where
298 Self: LoadBlock + LoadState,
299 {
300 async move {
301 if include_pending &&
303 let Some(tx) =
304 RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
305 {
306 let transaction = tx.transaction.clone_into_consensus();
307 return Ok(Some(self.tx_resp_builder().fill_pending(transaction)?));
308 }
309
310 if !self.get_code(sender, None).await?.is_empty() {
312 return Ok(None);
313 }
314
315 let highest = self.transaction_count(sender, None).await?.saturating_to::<u64>();
316
317 if nonce >= highest {
320 return Ok(None);
321 }
322
323 let Ok(high) = self.provider().best_block_number() else {
324 return Err(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()).into());
325 };
326
327 let num = binary_search::<_, _, Self::Error>(1, high, |mid| async move {
330 let mid_nonce =
331 self.transaction_count(sender, Some(mid.into())).await?.saturating_to::<u64>();
332
333 Ok(mid_nonce > nonce)
334 })
335 .await?;
336
337 let block_id = num.into();
338 self.recovered_block(block_id)
339 .await?
340 .and_then(|block| {
341 let block_hash = block.hash();
342 let block_number = block.number();
343 let base_fee_per_gas = block.base_fee_per_gas();
344
345 block
346 .transactions_with_sender()
347 .enumerate()
348 .find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
349 .map(|(index, (signer, tx))| {
350 let tx_info = TransactionInfo {
351 hash: Some(*tx.tx_hash()),
352 block_hash: Some(block_hash),
353 block_number: Some(block_number),
354 base_fee: base_fee_per_gas,
355 index: Some(index as u64),
356 };
357 self.tx_resp_builder().fill(tx.clone().with_signer(*signer), tx_info)
358 })
359 })
360 .ok_or(EthApiError::HeaderNotFound(block_id))?
361 .map(Some)
362 }
363 }
364
365 fn raw_transaction_by_block_and_tx_index(
369 &self,
370 block_id: BlockId,
371 index: usize,
372 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send
373 where
374 Self: LoadBlock,
375 {
376 async move {
377 if let Some(block) = self.recovered_block(block_id).await? &&
378 let Some(tx) = block.body().transactions().get(index)
379 {
380 return Ok(Some(tx.encoded_2718().into()))
381 }
382
383 Ok(None)
384 }
385 }
386
387 fn send_transaction(
390 &self,
391 mut request: RpcTxReq<Self::NetworkTypes>,
392 ) -> impl Future<Output = Result<B256, Self::Error>> + Send
393 where
394 Self: EthApiSpec + LoadBlock + EstimateCall,
395 {
396 async move {
397 let from = match request.as_ref().from() {
398 Some(from) => from,
399 None => return Err(SignError::NoAccount.into_eth_err()),
400 };
401
402 if self.find_signer(&from).is_err() {
403 return Err(SignError::NoAccount.into_eth_err())
404 }
405
406 if request.as_ref().nonce().is_none() {
408 let nonce = self.next_available_nonce(from).await?;
409 request.as_mut().set_nonce(nonce);
410 }
411
412 let chain_id = self.chain_id();
413 request.as_mut().set_chain_id(chain_id.to());
414
415 let estimated_gas =
416 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
417 let gas_limit = estimated_gas;
418 request.as_mut().set_gas_limit(gas_limit.to());
419
420 let transaction = self.sign_request(&from, request).await?.with_signer(from);
421
422 let pool_transaction =
423 <<Self as RpcNodeCore>::Pool as TransactionPool>::Transaction::try_from_consensus(
424 transaction,
425 )
426 .map_err(|_| EthApiError::TransactionConversionError)?;
427
428 let AddedTransactionOutcome { hash, .. } = self
430 .pool()
431 .add_transaction(TransactionOrigin::Local, pool_transaction)
432 .await
433 .map_err(Self::Error::from_eth_err)?;
434
435 Ok(hash)
436 }
437 }
438
439 fn fill_transaction(
441 &self,
442 mut request: RpcTxReq<Self::NetworkTypes>,
443 ) -> impl Future<Output = Result<FillTransactionResult<TxTy<Self::Primitives>>, Self::Error>> + Send
444 where
445 Self: EthApiSpec + LoadBlock + EstimateCall + LoadFee,
446 {
447 async move {
448 let from = match request.as_ref().from() {
449 Some(from) => from,
450 None => return Err(SignError::NoAccount.into_eth_err()),
451 };
452
453 if request.as_ref().value().is_none() {
454 request.as_mut().set_value(U256::ZERO);
455 }
456
457 if request.as_ref().nonce().is_none() {
458 let nonce = self.next_available_nonce(from).await?;
459 request.as_mut().set_nonce(nonce);
460 }
461
462 let chain_id = self.chain_id();
463 request.as_mut().set_chain_id(chain_id.to());
464
465 if request.as_ref().has_eip4844_fields() &&
466 request.as_ref().max_fee_per_blob_gas().is_none()
467 {
468 let blob_fee = self.blob_base_fee().await?;
469 request.as_mut().set_max_fee_per_blob_gas(blob_fee.to());
470 }
471
472 if request.as_ref().blob_sidecar().is_some() &&
473 request.as_ref().blob_versioned_hashes.is_none()
474 {
475 request.as_mut().populate_blob_hashes();
476 }
477
478 if request.as_ref().gas_limit().is_none() {
479 let estimated_gas =
480 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
481 request.as_mut().set_gas_limit(estimated_gas.to());
482 }
483
484 if request.as_ref().gas_price().is_none() {
485 let tip = if let Some(tip) = request.as_ref().max_priority_fee_per_gas() {
486 tip
487 } else {
488 let tip = self.suggested_priority_fee().await?.to::<u128>();
489 request.as_mut().set_max_priority_fee_per_gas(tip);
490 tip
491 };
492 if request.as_ref().max_fee_per_gas().is_none() {
493 let header =
494 self.provider().latest_header().map_err(Self::Error::from_eth_err)?;
495 let base_fee = header.and_then(|h| h.base_fee_per_gas()).unwrap_or_default();
496 request.as_mut().set_max_fee_per_gas(base_fee as u128 + tip);
497 }
498 }
499
500 let tx = self.tx_resp_builder().build_simulate_v1_transaction(request)?;
501
502 let raw = tx.encoded_2718().into();
503
504 Ok(FillTransactionResult { raw, tx })
505 }
506 }
507
508 fn sign_request(
510 &self,
511 from: &Address,
512 txn: RpcTxReq<Self::NetworkTypes>,
513 ) -> impl Future<Output = Result<ProviderTx<Self::Provider>, Self::Error>> + Send {
514 async move {
515 self.find_signer(from)?
516 .sign_transaction(txn, from)
517 .await
518 .map_err(Self::Error::from_eth_err)
519 }
520 }
521
522 fn sign(
524 &self,
525 account: Address,
526 message: Bytes,
527 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
528 async move {
529 Ok(self
530 .find_signer(&account)?
531 .sign(account, &message)
532 .await
533 .map_err(Self::Error::from_eth_err)?
534 .as_bytes()
535 .into())
536 }
537 }
538
539 fn sign_transaction(
542 &self,
543 request: RpcTxReq<Self::NetworkTypes>,
544 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
545 async move {
546 let from = match request.as_ref().from() {
547 Some(from) => from,
548 None => return Err(SignError::NoAccount.into_eth_err()),
549 };
550
551 Ok(self.sign_request(&from, request).await?.encoded_2718().into())
552 }
553 }
554
555 fn sign_typed_data(&self, data: &TypedData, account: Address) -> Result<Bytes, Self::Error> {
557 Ok(self
558 .find_signer(&account)?
559 .sign_typed_data(account, data)
560 .map_err(Self::Error::from_eth_err)?
561 .as_bytes()
562 .into())
563 }
564
565 #[expect(clippy::type_complexity)]
567 fn find_signer(
568 &self,
569 account: &Address,
570 ) -> Result<
571 Box<dyn EthSigner<ProviderTx<Self::Provider>, RpcTxReq<Self::NetworkTypes>> + 'static>,
572 Self::Error,
573 > {
574 self.signers()
575 .read()
576 .iter()
577 .find(|signer| signer.is_signer_for(account))
578 .map(|signer| dyn_clone::clone_box(&**signer))
579 .ok_or_else(|| SignError::NoAccount.into_eth_err())
580 }
581}
582
583pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
588 #[expect(clippy::complexity)]
594 fn transaction_by_hash(
595 &self,
596 hash: B256,
597 ) -> impl Future<
598 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
599 > + Send {
600 async move {
601 let mut resp = self
603 .spawn_blocking_io(move |this| {
604 match this
605 .provider()
606 .transaction_by_hash_with_meta(hash)
607 .map_err(Self::Error::from_eth_err)?
608 {
609 None => Ok(None),
610 Some((tx, meta)) => {
611 let transaction = tx
615 .try_into_recovered_unchecked()
616 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
617
618 let tx = TransactionSource::Block {
619 transaction,
620 index: meta.index,
621 block_hash: meta.block_hash,
622 block_number: meta.block_number,
623 base_fee: meta.base_fee,
624 };
625 Ok(Some(tx))
626 }
627 }
628 })
629 .await?;
630
631 if resp.is_none() {
632 if let Some(tx) =
634 self.pool().get(&hash).map(|tx| tx.transaction.clone().into_consensus())
635 {
636 resp = Some(TransactionSource::Pool(tx.into()));
637 }
638 }
639
640 Ok(resp)
641 }
642 }
643
644 #[expect(clippy::type_complexity)]
648 fn transaction_by_hash_at(
649 &self,
650 transaction_hash: B256,
651 ) -> impl Future<
652 Output = Result<
653 Option<(TransactionSource<ProviderTx<Self::Provider>>, BlockId)>,
654 Self::Error,
655 >,
656 > + Send {
657 async move {
658 Ok(self.transaction_by_hash(transaction_hash).await?.map(|tx| match tx {
659 tx @ TransactionSource::Pool(_) => (tx, BlockId::pending()),
660 tx @ TransactionSource::Block { block_hash, .. } => {
661 (tx, BlockId::Hash(block_hash.into()))
662 }
663 }))
664 }
665 }
666
667 #[expect(clippy::type_complexity)]
669 fn transaction_and_block(
670 &self,
671 hash: B256,
672 ) -> impl Future<
673 Output = Result<
674 Option<(
675 TransactionSource<ProviderTx<Self::Provider>>,
676 Arc<RecoveredBlock<ProviderBlock<Self::Provider>>>,
677 )>,
678 Self::Error,
679 >,
680 > + Send {
681 async move {
682 let (transaction, at) = match self.transaction_by_hash_at(hash).await? {
683 None => return Ok(None),
684 Some(res) => res,
685 };
686
687 let block_hash = match at {
689 BlockId::Hash(hash) => hash.block_hash,
690 _ => return Ok(None),
691 };
692 let block = self
693 .cache()
694 .get_recovered_block(block_hash)
695 .await
696 .map_err(Self::Error::from_eth_err)?;
697 Ok(block.map(|block| (transaction, block)))
698 }
699 }
700}