reth_rpc_eth_api/helpers/
transaction.rs1use super::{EthApiSpec, EthSigner, LoadBlock, LoadFee, LoadReceipt, LoadState, SpawnBlocking};
5use crate::{
6 helpers::{estimate::EstimateCall, spec::SignersForRpc},
7 FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
8 RpcTransaction,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
12 BlockHeader, Transaction,
13};
14use alloy_dyn_abi::TypedData;
15use alloy_eips::{eip2718::Encodable2718, BlockId};
16use alloy_network::{TransactionBuilder, TransactionBuilder4844};
17use alloy_primitives::{Address, Bytes, TxHash, B256, U256};
18use alloy_rpc_types_eth::{BlockNumberOrTag, TransactionInfo};
19use futures::{Future, StreamExt};
20use reth_chain_state::CanonStateSubscriptions;
21use reth_node_api::BlockBody;
22use reth_primitives_traits::{Recovered, RecoveredBlock, SignedTransaction, TxTy, WithEncoded};
23use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq, TransactionConversionError};
24use reth_rpc_eth_types::{
25 utils::{binary_search, recover_raw_transaction},
26 EthApiError::{self, TransactionConfirmationTimeout},
27 FillTransaction, SignError, TransactionSource,
28};
29use reth_storage_api::{
30 BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
31 TransactionsProvider,
32};
33use reth_transaction_pool::{
34 AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
35};
36use std::{sync::Arc, time::Duration};
37
38pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
61 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes>;
65
66 fn accounts(&self) -> Vec<Address> {
68 self.signers().read().iter().flat_map(|s| s.accounts()).collect()
69 }
70
71 fn send_raw_transaction_sync_timeout(&self) -> Duration;
73
74 fn send_raw_transaction(
78 &self,
79 tx: Bytes,
80 ) -> impl Future<Output = Result<B256, Self::Error>> + Send {
81 async move {
82 let recovered = recover_raw_transaction::<PoolPooledTx<Self::Pool>>(&tx)?;
83 self.send_transaction(WithEncoded::new(tx, recovered)).await
84 }
85 }
86
87 fn send_transaction(
89 &self,
90 tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
91 ) -> impl Future<Output = Result<B256, Self::Error>> + Send;
92
93 fn send_raw_transaction_sync(
97 &self,
98 tx: Bytes,
99 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
100 where
101 Self: LoadReceipt + 'static,
102 {
103 let this = self.clone();
104 let timeout_duration = self.send_raw_transaction_sync_timeout();
105 async move {
106 let mut stream = this.provider().canonical_state_stream();
107 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
108 tokio::time::timeout(timeout_duration, async {
109 while let Some(notification) = stream.next().await {
110 let chain = notification.committed();
111 for block in chain.blocks_iter() {
112 if block.body().contains_transaction(&hash) &&
113 let Some(receipt) = this.transaction_receipt(hash).await?
114 {
115 return Ok(receipt);
116 }
117 }
118 }
119 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
120 hash,
121 duration: timeout_duration,
122 }))
123 })
124 .await
125 .unwrap_or_else(|_elapsed| {
126 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
127 hash,
128 duration: timeout_duration,
129 }))
130 })
131 }
132 }
133
134 #[expect(clippy::complexity)]
140 fn transaction_by_hash(
141 &self,
142 hash: B256,
143 ) -> impl Future<
144 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
145 > + Send {
146 LoadTransaction::transaction_by_hash(self, hash)
147 }
148
149 #[expect(clippy::type_complexity)]
153 fn transactions_by_block(
154 &self,
155 block: B256,
156 ) -> impl Future<Output = Result<Option<Vec<ProviderTx<Self::Provider>>>, Self::Error>> + Send
157 {
158 async move {
159 self.cache()
160 .get_recovered_block(block)
161 .await
162 .map(|b| b.map(|b| b.body().transactions().to_vec()))
163 .map_err(Self::Error::from_eth_err)
164 }
165 }
166
167 fn raw_transaction_by_hash(
175 &self,
176 hash: B256,
177 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send {
178 async move {
179 if let Some(tx) =
181 self.pool().get_pooled_transaction_element(hash).map(|tx| tx.encoded_2718().into())
182 {
183 return Ok(Some(tx))
184 }
185
186 self.spawn_blocking_io(move |ref this| {
187 Ok(this
188 .provider()
189 .transaction_by_hash(hash)
190 .map_err(Self::Error::from_eth_err)?
191 .map(|tx| tx.encoded_2718().into()))
192 })
193 .await
194 }
195 }
196
197 #[expect(clippy::type_complexity)]
199 fn historical_transaction_by_hash_at(
200 &self,
201 hash: B256,
202 ) -> impl Future<
203 Output = Result<Option<(TransactionSource<ProviderTx<Self::Provider>>, B256)>, Self::Error>,
204 > + Send {
205 async move {
206 match self.transaction_by_hash_at(hash).await? {
207 None => Ok(None),
208 Some((tx, at)) => Ok(at.as_block_hash().map(|hash| (tx, hash))),
209 }
210 }
211 }
212
213 fn transaction_receipt(
218 &self,
219 hash: B256,
220 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
221 where
222 Self: LoadReceipt + 'static,
223 {
224 async move {
225 match self.load_transaction_and_receipt(hash).await? {
226 Some((tx, meta, receipt)) => {
227 self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
228 }
229 None => Ok(None),
230 }
231 }
232 }
233
234 #[expect(clippy::complexity)]
236 fn load_transaction_and_receipt(
237 &self,
238 hash: TxHash,
239 ) -> impl Future<
240 Output = Result<
241 Option<(ProviderTx<Self::Provider>, TransactionMeta, ProviderReceipt<Self::Provider>)>,
242 Self::Error,
243 >,
244 > + Send
245 where
246 Self: 'static,
247 {
248 self.spawn_blocking_io(move |this| {
249 let provider = this.provider();
250 let (tx, meta) = match provider
251 .transaction_by_hash_with_meta(hash)
252 .map_err(Self::Error::from_eth_err)?
253 {
254 Some((tx, meta)) => (tx, meta),
255 None => return Ok(None),
256 };
257
258 let receipt = match provider.receipt_by_hash(hash).map_err(Self::Error::from_eth_err)? {
259 Some(recpt) => recpt,
260 None => return Ok(None),
261 };
262
263 Ok(Some((tx, meta, receipt)))
264 })
265 }
266
267 fn transaction_by_block_and_tx_index(
271 &self,
272 block_id: BlockId,
273 index: usize,
274 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
275 where
276 Self: LoadBlock,
277 {
278 async move {
279 if let Some(block) = self.recovered_block(block_id).await? {
280 let block_hash = block.hash();
281 let block_number = block.number();
282 let base_fee_per_gas = block.base_fee_per_gas();
283 if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
284 let tx_info = TransactionInfo {
285 hash: Some(*tx.tx_hash()),
286 block_hash: Some(block_hash),
287 block_number: Some(block_number),
288 base_fee: base_fee_per_gas,
289 index: Some(index as u64),
290 };
291
292 return Ok(Some(
293 self.converter().fill(tx.clone().with_signer(*signer), tx_info)?,
294 ))
295 }
296 }
297
298 Ok(None)
299 }
300 }
301
302 fn get_transaction_by_sender_and_nonce(
304 &self,
305 sender: Address,
306 nonce: u64,
307 include_pending: bool,
308 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
309 where
310 Self: LoadBlock + LoadState,
311 {
312 async move {
313 if include_pending &&
315 let Some(tx) =
316 RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
317 {
318 let transaction = tx.transaction.clone_into_consensus();
319 return Ok(Some(self.converter().fill_pending(transaction)?));
320 }
321
322 let highest = self.transaction_count(sender, None).await?.saturating_to::<u64>();
326
327 if nonce >= highest {
330 return Ok(None);
331 }
332
333 let Ok(high) = self.provider().best_block_number() else {
334 return Err(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()).into());
335 };
336
337 let num = binary_search::<_, _, Self::Error>(1, high, |mid| async move {
340 let mid_nonce =
341 self.transaction_count(sender, Some(mid.into())).await?.saturating_to::<u64>();
342
343 Ok(mid_nonce > nonce)
344 })
345 .await?;
346
347 let block_id = num.into();
348 self.recovered_block(block_id)
349 .await?
350 .and_then(|block| {
351 let block_hash = block.hash();
352 let block_number = block.number();
353 let base_fee_per_gas = block.base_fee_per_gas();
354
355 block
356 .transactions_with_sender()
357 .enumerate()
358 .find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
359 .map(|(index, (signer, tx))| {
360 let tx_info = TransactionInfo {
361 hash: Some(*tx.tx_hash()),
362 block_hash: Some(block_hash),
363 block_number: Some(block_number),
364 base_fee: base_fee_per_gas,
365 index: Some(index as u64),
366 };
367 Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
368 })
369 })
370 .ok_or(EthApiError::HeaderNotFound(block_id))?
371 .map(Some)
372 }
373 }
374
375 fn raw_transaction_by_block_and_tx_index(
379 &self,
380 block_id: BlockId,
381 index: usize,
382 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send
383 where
384 Self: LoadBlock,
385 {
386 async move {
387 if let Some(block) = self.recovered_block(block_id).await? &&
388 let Some(tx) = block.body().transactions().get(index)
389 {
390 return Ok(Some(tx.encoded_2718().into()))
391 }
392
393 Ok(None)
394 }
395 }
396
397 fn send_transaction_request(
400 &self,
401 mut request: RpcTxReq<Self::NetworkTypes>,
402 ) -> impl Future<Output = Result<B256, Self::Error>> + Send
403 where
404 Self: EthApiSpec + LoadBlock + EstimateCall,
405 {
406 async move {
407 let from = match request.as_ref().from() {
408 Some(from) => from,
409 None => return Err(SignError::NoAccount.into_eth_err()),
410 };
411
412 if self.find_signer(&from).is_err() {
413 return Err(SignError::NoAccount.into_eth_err())
414 }
415
416 if request.as_ref().nonce().is_none() {
418 let nonce = self.next_available_nonce(from).await?;
419 request.as_mut().set_nonce(nonce);
420 }
421
422 let chain_id = self.chain_id();
423 request.as_mut().set_chain_id(chain_id.to());
424
425 let estimated_gas =
426 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
427 let gas_limit = estimated_gas;
428 request.as_mut().set_gas_limit(gas_limit.to());
429
430 let transaction = self.sign_request(&from, request).await?.with_signer(from);
431
432 let pool_transaction =
433 <<Self as RpcNodeCore>::Pool as TransactionPool>::Transaction::try_from_consensus(
434 transaction,
435 )
436 .map_err(|e| {
437 Self::Error::from_eth_err(TransactionConversionError::Other(e.to_string()))
438 })?;
439
440 let AddedTransactionOutcome { hash, .. } = self
442 .pool()
443 .add_transaction(TransactionOrigin::Local, pool_transaction)
444 .await
445 .map_err(Self::Error::from_eth_err)?;
446
447 Ok(hash)
448 }
449 }
450
451 fn fill_transaction(
453 &self,
454 mut request: RpcTxReq<Self::NetworkTypes>,
455 ) -> impl Future<Output = Result<FillTransaction<TxTy<Self::Primitives>>, Self::Error>> + Send
456 where
457 Self: EthApiSpec + LoadBlock + EstimateCall + LoadFee,
458 {
459 async move {
460 let from = match request.as_ref().from() {
461 Some(from) => from,
462 None => return Err(SignError::NoAccount.into_eth_err()),
463 };
464
465 if request.as_ref().value().is_none() {
466 request.as_mut().set_value(U256::ZERO);
467 }
468
469 if request.as_ref().nonce().is_none() {
470 let nonce = self.next_available_nonce(from).await?;
471 request.as_mut().set_nonce(nonce);
472 }
473
474 let chain_id = self.chain_id();
475 request.as_mut().set_chain_id(chain_id.to());
476
477 if request.as_ref().has_eip4844_fields() &&
478 request.as_ref().max_fee_per_blob_gas().is_none()
479 {
480 let blob_fee = self.blob_base_fee().await?;
481 request.as_mut().set_max_fee_per_blob_gas(blob_fee.to());
482 }
483
484 if request.as_ref().blob_sidecar().is_some() &&
485 request.as_ref().blob_versioned_hashes.is_none()
486 {
487 request.as_mut().populate_blob_hashes();
488 }
489
490 if request.as_ref().gas_limit().is_none() {
491 let estimated_gas =
492 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
493 request.as_mut().set_gas_limit(estimated_gas.to());
494 }
495
496 if request.as_ref().gas_price().is_none() {
497 let tip = if let Some(tip) = request.as_ref().max_priority_fee_per_gas() {
498 tip
499 } else {
500 let tip = self.suggested_priority_fee().await?.to::<u128>();
501 request.as_mut().set_max_priority_fee_per_gas(tip);
502 tip
503 };
504 if request.as_ref().max_fee_per_gas().is_none() {
505 let header =
506 self.provider().latest_header().map_err(Self::Error::from_eth_err)?;
507 let base_fee = header.and_then(|h| h.base_fee_per_gas()).unwrap_or_default();
508 request.as_mut().set_max_fee_per_gas(base_fee as u128 + tip);
509 }
510 }
511
512 let tx = self.converter().build_simulate_v1_transaction(request)?;
513
514 let raw = tx.encoded_2718().into();
515
516 Ok(FillTransaction { raw, tx })
517 }
518 }
519
520 fn sign_request(
522 &self,
523 from: &Address,
524 txn: RpcTxReq<Self::NetworkTypes>,
525 ) -> impl Future<Output = Result<ProviderTx<Self::Provider>, Self::Error>> + Send {
526 async move {
527 self.find_signer(from)?
528 .sign_transaction(txn, from)
529 .await
530 .map_err(Self::Error::from_eth_err)
531 }
532 }
533
534 fn sign(
536 &self,
537 account: Address,
538 message: Bytes,
539 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
540 async move {
541 Ok(self
542 .find_signer(&account)?
543 .sign(account, &message)
544 .await
545 .map_err(Self::Error::from_eth_err)?
546 .as_bytes()
547 .into())
548 }
549 }
550
551 fn sign_transaction(
554 &self,
555 request: RpcTxReq<Self::NetworkTypes>,
556 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
557 async move {
558 let from = match request.as_ref().from() {
559 Some(from) => from,
560 None => return Err(SignError::NoAccount.into_eth_err()),
561 };
562
563 Ok(self.sign_request(&from, request).await?.encoded_2718().into())
564 }
565 }
566
567 fn sign_typed_data(&self, data: &TypedData, account: Address) -> Result<Bytes, Self::Error> {
569 Ok(self
570 .find_signer(&account)?
571 .sign_typed_data(account, data)
572 .map_err(Self::Error::from_eth_err)?
573 .as_bytes()
574 .into())
575 }
576
577 #[expect(clippy::type_complexity)]
579 fn find_signer(
580 &self,
581 account: &Address,
582 ) -> Result<
583 Box<dyn EthSigner<ProviderTx<Self::Provider>, RpcTxReq<Self::NetworkTypes>> + 'static>,
584 Self::Error,
585 > {
586 self.signers()
587 .read()
588 .iter()
589 .find(|signer| signer.is_signer_for(account))
590 .map(|signer| dyn_clone::clone_box(&**signer))
591 .ok_or_else(|| SignError::NoAccount.into_eth_err())
592 }
593}
594
595pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
600 #[expect(clippy::complexity)]
606 fn transaction_by_hash(
607 &self,
608 hash: B256,
609 ) -> impl Future<
610 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
611 > + Send {
612 async move {
613 if let Some((tx, meta)) = self
615 .spawn_blocking_io(move |this| {
616 this.provider()
617 .transaction_by_hash_with_meta(hash)
618 .map_err(Self::Error::from_eth_err)
619 })
620 .await?
621 {
622 let transaction = tx
626 .try_into_recovered_unchecked()
627 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
628
629 return Ok(Some(TransactionSource::Block {
630 transaction,
631 index: meta.index,
632 block_hash: meta.block_hash,
633 block_number: meta.block_number,
634 base_fee: meta.base_fee,
635 }));
636 }
637
638 if let Some(tx) = self.pool().get(&hash).map(|tx| tx.transaction.clone_into_consensus())
640 {
641 return Ok(Some(TransactionSource::Pool(tx.into())));
642 }
643
644 Ok(None)
645 }
646 }
647
648 #[expect(clippy::type_complexity)]
652 fn transaction_by_hash_at(
653 &self,
654 transaction_hash: B256,
655 ) -> impl Future<
656 Output = Result<
657 Option<(TransactionSource<ProviderTx<Self::Provider>>, BlockId)>,
658 Self::Error,
659 >,
660 > + Send {
661 async move {
662 Ok(self.transaction_by_hash(transaction_hash).await?.map(|tx| match tx {
663 tx @ TransactionSource::Pool(_) => (tx, BlockId::pending()),
664 tx @ TransactionSource::Block { block_hash, .. } => {
665 (tx, BlockId::Hash(block_hash.into()))
666 }
667 }))
668 }
669 }
670
671 #[expect(clippy::type_complexity)]
673 fn transaction_and_block(
674 &self,
675 hash: B256,
676 ) -> impl Future<
677 Output = Result<
678 Option<(
679 TransactionSource<ProviderTx<Self::Provider>>,
680 Arc<RecoveredBlock<ProviderBlock<Self::Provider>>>,
681 )>,
682 Self::Error,
683 >,
684 > + Send {
685 async move {
686 let (transaction, at) = match self.transaction_by_hash_at(hash).await? {
687 None => return Ok(None),
688 Some(res) => res,
689 };
690
691 let block_hash = match at {
693 BlockId::Hash(hash) => hash.block_hash,
694 _ => return Ok(None),
695 };
696 let block = self
697 .cache()
698 .get_recovered_block(block_hash)
699 .await
700 .map_err(Self::Error::from_eth_err)?;
701 Ok(block.map(|block| (transaction, block)))
702 }
703 }
704}