reth_rpc_eth_api/helpers/
transaction.rs1use super::{EthApiSpec, EthSigner, LoadBlock, LoadReceipt, LoadState, SpawnBlocking};
5use crate::{
6 helpers::{estimate::EstimateCall, spec::SignersForRpc},
7 FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
8 RpcTransaction,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
12 BlockHeader, Transaction,
13};
14use alloy_dyn_abi::TypedData;
15use alloy_eips::{eip2718::Encodable2718, BlockId};
16use alloy_network::TransactionBuilder;
17use alloy_primitives::{Address, Bytes, TxHash, B256};
18use alloy_rpc_types_eth::{BlockNumberOrTag, TransactionInfo};
19use futures::{Future, StreamExt};
20use reth_chain_state::CanonStateSubscriptions;
21use reth_node_api::BlockBody;
22use reth_primitives_traits::{RecoveredBlock, SignedTransaction};
23use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq};
24use reth_rpc_eth_types::{
25 utils::binary_search, EthApiError, EthApiError::TransactionConfirmationTimeout, SignError,
26 TransactionSource,
27};
28use reth_storage_api::{
29 BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
30 TransactionsProvider,
31};
32use reth_transaction_pool::{
33 AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
34};
35use std::{sync::Arc, time::Duration};
36
37pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
60 fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes>;
64
65 fn accounts(&self) -> Vec<Address> {
67 self.signers().read().iter().flat_map(|s| s.accounts()).collect()
68 }
69
70 fn send_raw_transaction_sync_timeout(&self) -> Duration;
72
73 fn send_raw_transaction(
77 &self,
78 tx: Bytes,
79 ) -> impl Future<Output = Result<B256, Self::Error>> + Send;
80
81 fn send_raw_transaction_sync(
85 &self,
86 tx: Bytes,
87 ) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
88 where
89 Self: LoadReceipt + 'static,
90 {
91 let this = self.clone();
92 let timeout_duration = self.send_raw_transaction_sync_timeout();
93 async move {
94 let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
95 let mut stream = this.provider().canonical_state_stream();
96 tokio::time::timeout(timeout_duration, async {
97 while let Some(notification) = stream.next().await {
98 let chain = notification.committed();
99 for block in chain.blocks_iter() {
100 if block.body().contains_transaction(&hash) &&
101 let Some(receipt) = this.transaction_receipt(hash).await?
102 {
103 return Ok(receipt);
104 }
105 }
106 }
107 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
108 hash,
109 duration: timeout_duration,
110 }))
111 })
112 .await
113 .unwrap_or_else(|_elapsed| {
114 Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {
115 hash,
116 duration: timeout_duration,
117 }))
118 })
119 }
120 }
121
122 #[expect(clippy::complexity)]
128 fn transaction_by_hash(
129 &self,
130 hash: B256,
131 ) -> impl Future<
132 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
133 > + Send {
134 LoadTransaction::transaction_by_hash(self, hash)
135 }
136
137 #[expect(clippy::type_complexity)]
141 fn transactions_by_block(
142 &self,
143 block: B256,
144 ) -> impl Future<Output = Result<Option<Vec<ProviderTx<Self::Provider>>>, Self::Error>> + Send
145 {
146 async move {
147 self.cache()
148 .get_recovered_block(block)
149 .await
150 .map(|b| b.map(|b| b.body().transactions().to_vec()))
151 .map_err(Self::Error::from_eth_err)
152 }
153 }
154
155 fn raw_transaction_by_hash(
163 &self,
164 hash: B256,
165 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send {
166 async move {
167 if let Some(tx) =
169 self.pool().get_pooled_transaction_element(hash).map(|tx| tx.encoded_2718().into())
170 {
171 return Ok(Some(tx))
172 }
173
174 self.spawn_blocking_io(move |ref this| {
175 Ok(this
176 .provider()
177 .transaction_by_hash(hash)
178 .map_err(Self::Error::from_eth_err)?
179 .map(|tx| tx.encoded_2718().into()))
180 })
181 .await
182 }
183 }
184
185 #[expect(clippy::type_complexity)]
187 fn historical_transaction_by_hash_at(
188 &self,
189 hash: B256,
190 ) -> impl Future<
191 Output = Result<Option<(TransactionSource<ProviderTx<Self::Provider>>, B256)>, Self::Error>,
192 > + Send {
193 async move {
194 match self.transaction_by_hash_at(hash).await? {
195 None => Ok(None),
196 Some((tx, at)) => Ok(at.as_block_hash().map(|hash| (tx, hash))),
197 }
198 }
199 }
200
201 fn transaction_receipt(
206 &self,
207 hash: B256,
208 ) -> impl Future<Output = Result<Option<RpcReceipt<Self::NetworkTypes>>, Self::Error>> + Send
209 where
210 Self: LoadReceipt + 'static,
211 {
212 async move {
213 match self.load_transaction_and_receipt(hash).await? {
214 Some((tx, meta, receipt)) => {
215 self.build_transaction_receipt(tx, meta, receipt).await.map(Some)
216 }
217 None => Ok(None),
218 }
219 }
220 }
221
222 #[expect(clippy::complexity)]
224 fn load_transaction_and_receipt(
225 &self,
226 hash: TxHash,
227 ) -> impl Future<
228 Output = Result<
229 Option<(ProviderTx<Self::Provider>, TransactionMeta, ProviderReceipt<Self::Provider>)>,
230 Self::Error,
231 >,
232 > + Send
233 where
234 Self: 'static,
235 {
236 self.spawn_blocking_io(move |this| {
237 let provider = this.provider();
238 let (tx, meta) = match provider
239 .transaction_by_hash_with_meta(hash)
240 .map_err(Self::Error::from_eth_err)?
241 {
242 Some((tx, meta)) => (tx, meta),
243 None => return Ok(None),
244 };
245
246 let receipt = match provider.receipt_by_hash(hash).map_err(Self::Error::from_eth_err)? {
247 Some(recpt) => recpt,
248 None => return Ok(None),
249 };
250
251 Ok(Some((tx, meta, receipt)))
252 })
253 }
254
255 fn transaction_by_block_and_tx_index(
259 &self,
260 block_id: BlockId,
261 index: usize,
262 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
263 where
264 Self: LoadBlock,
265 {
266 async move {
267 if let Some(block) = self.recovered_block(block_id).await? {
268 let block_hash = block.hash();
269 let block_number = block.number();
270 let base_fee_per_gas = block.base_fee_per_gas();
271 if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
272 let tx_info = TransactionInfo {
273 hash: Some(*tx.tx_hash()),
274 block_hash: Some(block_hash),
275 block_number: Some(block_number),
276 base_fee: base_fee_per_gas,
277 index: Some(index as u64),
278 };
279
280 return Ok(Some(
281 self.tx_resp_builder().fill(tx.clone().with_signer(*signer), tx_info)?,
282 ))
283 }
284 }
285
286 Ok(None)
287 }
288 }
289
290 fn get_transaction_by_sender_and_nonce(
292 &self,
293 sender: Address,
294 nonce: u64,
295 include_pending: bool,
296 ) -> impl Future<Output = Result<Option<RpcTransaction<Self::NetworkTypes>>, Self::Error>> + Send
297 where
298 Self: LoadBlock + LoadState,
299 {
300 async move {
301 if include_pending &&
303 let Some(tx) =
304 RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
305 {
306 let transaction = tx.transaction.clone_into_consensus();
307 return Ok(Some(self.tx_resp_builder().fill_pending(transaction)?));
308 }
309
310 if !self.get_code(sender, None).await?.is_empty() {
312 return Ok(None);
313 }
314
315 let highest = self.transaction_count(sender, None).await?.saturating_to::<u64>();
316
317 if nonce >= highest {
320 return Ok(None);
321 }
322
323 let Ok(high) = self.provider().best_block_number() else {
324 return Err(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()).into());
325 };
326
327 let num = binary_search::<_, _, Self::Error>(1, high, |mid| async move {
330 let mid_nonce =
331 self.transaction_count(sender, Some(mid.into())).await?.saturating_to::<u64>();
332
333 Ok(mid_nonce > nonce)
334 })
335 .await?;
336
337 let block_id = num.into();
338 self.recovered_block(block_id)
339 .await?
340 .and_then(|block| {
341 let block_hash = block.hash();
342 let block_number = block.number();
343 let base_fee_per_gas = block.base_fee_per_gas();
344
345 block
346 .transactions_with_sender()
347 .enumerate()
348 .find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
349 .map(|(index, (signer, tx))| {
350 let tx_info = TransactionInfo {
351 hash: Some(*tx.tx_hash()),
352 block_hash: Some(block_hash),
353 block_number: Some(block_number),
354 base_fee: base_fee_per_gas,
355 index: Some(index as u64),
356 };
357 self.tx_resp_builder().fill(tx.clone().with_signer(*signer), tx_info)
358 })
359 })
360 .ok_or(EthApiError::HeaderNotFound(block_id))?
361 .map(Some)
362 }
363 }
364
365 fn raw_transaction_by_block_and_tx_index(
369 &self,
370 block_id: BlockId,
371 index: usize,
372 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send
373 where
374 Self: LoadBlock,
375 {
376 async move {
377 if let Some(block) = self.recovered_block(block_id).await? &&
378 let Some(tx) = block.body().transactions().get(index)
379 {
380 return Ok(Some(tx.encoded_2718().into()))
381 }
382
383 Ok(None)
384 }
385 }
386
387 fn send_transaction(
390 &self,
391 mut request: RpcTxReq<Self::NetworkTypes>,
392 ) -> impl Future<Output = Result<B256, Self::Error>> + Send
393 where
394 Self: EthApiSpec + LoadBlock + EstimateCall,
395 {
396 async move {
397 let from = match request.as_ref().from() {
398 Some(from) => from,
399 None => return Err(SignError::NoAccount.into_eth_err()),
400 };
401
402 if self.find_signer(&from).is_err() {
403 return Err(SignError::NoAccount.into_eth_err())
404 }
405
406 if request.as_ref().nonce().is_none() {
408 let nonce = self.next_available_nonce(from).await?;
409 request.as_mut().set_nonce(nonce);
410 }
411
412 let chain_id = self.chain_id();
413 request.as_mut().set_chain_id(chain_id.to());
414
415 let estimated_gas =
416 self.estimate_gas_at(request.clone(), BlockId::pending(), None).await?;
417 let gas_limit = estimated_gas;
418 request.as_mut().set_gas_limit(gas_limit.to());
419
420 let transaction = self.sign_request(&from, request).await?.with_signer(from);
421
422 let pool_transaction =
423 <<Self as RpcNodeCore>::Pool as TransactionPool>::Transaction::try_from_consensus(
424 transaction,
425 )
426 .map_err(|_| EthApiError::TransactionConversionError)?;
427
428 let AddedTransactionOutcome { hash, .. } = self
430 .pool()
431 .add_transaction(TransactionOrigin::Local, pool_transaction)
432 .await
433 .map_err(Self::Error::from_eth_err)?;
434
435 Ok(hash)
436 }
437 }
438
439 fn sign_request(
441 &self,
442 from: &Address,
443 txn: RpcTxReq<Self::NetworkTypes>,
444 ) -> impl Future<Output = Result<ProviderTx<Self::Provider>, Self::Error>> + Send {
445 async move {
446 self.find_signer(from)?
447 .sign_transaction(txn, from)
448 .await
449 .map_err(Self::Error::from_eth_err)
450 }
451 }
452
453 fn sign(
455 &self,
456 account: Address,
457 message: Bytes,
458 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
459 async move {
460 Ok(self
461 .find_signer(&account)?
462 .sign(account, &message)
463 .await
464 .map_err(Self::Error::from_eth_err)?
465 .as_bytes()
466 .into())
467 }
468 }
469
470 fn sign_transaction(
473 &self,
474 request: RpcTxReq<Self::NetworkTypes>,
475 ) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
476 async move {
477 let from = match request.as_ref().from() {
478 Some(from) => from,
479 None => return Err(SignError::NoAccount.into_eth_err()),
480 };
481
482 Ok(self.sign_request(&from, request).await?.encoded_2718().into())
483 }
484 }
485
486 fn sign_typed_data(&self, data: &TypedData, account: Address) -> Result<Bytes, Self::Error> {
488 Ok(self
489 .find_signer(&account)?
490 .sign_typed_data(account, data)
491 .map_err(Self::Error::from_eth_err)?
492 .as_bytes()
493 .into())
494 }
495
496 #[expect(clippy::type_complexity)]
498 fn find_signer(
499 &self,
500 account: &Address,
501 ) -> Result<
502 Box<dyn EthSigner<ProviderTx<Self::Provider>, RpcTxReq<Self::NetworkTypes>> + 'static>,
503 Self::Error,
504 > {
505 self.signers()
506 .read()
507 .iter()
508 .find(|signer| signer.is_signer_for(account))
509 .map(|signer| dyn_clone::clone_box(&**signer))
510 .ok_or_else(|| SignError::NoAccount.into_eth_err())
511 }
512}
513
514pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
519 #[expect(clippy::complexity)]
525 fn transaction_by_hash(
526 &self,
527 hash: B256,
528 ) -> impl Future<
529 Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
530 > + Send {
531 async move {
532 let mut resp = self
534 .spawn_blocking_io(move |this| {
535 match this
536 .provider()
537 .transaction_by_hash_with_meta(hash)
538 .map_err(Self::Error::from_eth_err)?
539 {
540 None => Ok(None),
541 Some((tx, meta)) => {
542 let transaction = tx
546 .try_into_recovered_unchecked()
547 .map_err(|_| EthApiError::InvalidTransactionSignature)?;
548
549 let tx = TransactionSource::Block {
550 transaction,
551 index: meta.index,
552 block_hash: meta.block_hash,
553 block_number: meta.block_number,
554 base_fee: meta.base_fee,
555 };
556 Ok(Some(tx))
557 }
558 }
559 })
560 .await?;
561
562 if resp.is_none() {
563 if let Some(tx) =
565 self.pool().get(&hash).map(|tx| tx.transaction.clone().into_consensus())
566 {
567 resp = Some(TransactionSource::Pool(tx.into()));
568 }
569 }
570
571 Ok(resp)
572 }
573 }
574
575 #[expect(clippy::type_complexity)]
579 fn transaction_by_hash_at(
580 &self,
581 transaction_hash: B256,
582 ) -> impl Future<
583 Output = Result<
584 Option<(TransactionSource<ProviderTx<Self::Provider>>, BlockId)>,
585 Self::Error,
586 >,
587 > + Send {
588 async move {
589 Ok(self.transaction_by_hash(transaction_hash).await?.map(|tx| match tx {
590 tx @ TransactionSource::Pool(_) => (tx, BlockId::pending()),
591 tx @ TransactionSource::Block { block_hash, .. } => {
592 (tx, BlockId::Hash(block_hash.into()))
593 }
594 }))
595 }
596 }
597
598 #[expect(clippy::type_complexity)]
600 fn transaction_and_block(
601 &self,
602 hash: B256,
603 ) -> impl Future<
604 Output = Result<
605 Option<(
606 TransactionSource<ProviderTx<Self::Provider>>,
607 Arc<RecoveredBlock<ProviderBlock<Self::Provider>>>,
608 )>,
609 Self::Error,
610 >,
611 > + Send {
612 async move {
613 let (transaction, at) = match self.transaction_by_hash_at(hash).await? {
614 None => return Ok(None),
615 Some(res) => res,
616 };
617
618 let block_hash = match at {
620 BlockId::Hash(hash) => hash.block_hash,
621 _ => return Ok(None),
622 };
623 let block = self
624 .cache()
625 .get_recovered_block(block_hash)
626 .await
627 .map_err(Self::Error::from_eth_err)?;
628 Ok(block.map(|block| (transaction, block)))
629 }
630 }
631}