reth_rpc/eth/
bundle.rs
1use alloy_consensus::{EnvKzgSettings, Transaction as _};
4use alloy_eips::eip4844::MAX_DATA_GAS_PER_BLOCK;
5use alloy_primitives::{Keccak256, U256};
6use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
7use jsonrpsee::core::RpcResult;
8use reth_evm::{ConfigureEvm, Evm};
9use reth_primitives_traits::SignedTransaction;
10use reth_revm::{database::StateProviderDatabase, db::CacheDB};
11use reth_rpc_eth_api::{
12 helpers::{Call, EthTransactions, LoadPendingBlock},
13 EthCallBundleApiServer, FromEthApiError, FromEvmError,
14};
15use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError, RpcInvalidTransactionError};
16use reth_tasks::pool::BlockingTaskGuard;
17use reth_transaction_pool::{
18 EthBlobTransactionSidecar, EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionPool,
19};
20use revm::{context_interface::result::ResultAndState, DatabaseCommit, DatabaseRef};
21use std::sync::Arc;
22
23pub struct EthBundle<Eth> {
25 inner: Arc<EthBundleInner<Eth>>,
27}
28
29impl<Eth> EthBundle<Eth> {
30 pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
32 Self { inner: Arc::new(EthBundleInner { eth_api, blocking_task_guard }) }
33 }
34
35 pub fn eth_api(&self) -> &Eth {
37 &self.inner.eth_api
38 }
39}
40
41impl<Eth> EthBundle<Eth>
42where
43 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
44{
45 pub async fn call_bundle(
50 &self,
51 bundle: EthCallBundle,
52 ) -> Result<EthCallBundleResponse, Eth::Error> {
53 let EthCallBundle {
54 txs,
55 block_number,
56 coinbase,
57 state_block_number,
58 timeout: _,
59 timestamp,
60 gas_limit,
61 difficulty,
62 base_fee,
63 ..
64 } = bundle;
65 if txs.is_empty() {
66 return Err(EthApiError::InvalidParams(
67 EthBundleError::EmptyBundleTransactions.to_string(),
68 )
69 .into())
70 }
71 if block_number == 0 {
72 return Err(EthApiError::InvalidParams(
73 EthBundleError::BundleMissingBlockNumber.to_string(),
74 )
75 .into())
76 }
77
78 let transactions = txs
79 .into_iter()
80 .map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
81 .collect::<Result<Vec<_>, _>>()?
82 .into_iter()
83 .collect::<Vec<_>>();
84
85 if transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>() >
88 MAX_DATA_GAS_PER_BLOCK
89 {
90 return Err(EthApiError::InvalidParams(
91 EthBundleError::Eip4844BlobGasExceeded.to_string(),
92 )
93 .into())
94 }
95
96 let block_id: alloy_rpc_types_eth::BlockId = state_block_number.into();
97 let (mut evm_env, at) = self.eth_api().evm_env_at(block_id).await?;
99
100 if let Some(coinbase) = coinbase {
101 evm_env.block_env.beneficiary = coinbase;
102 }
103
104 if let Some(timestamp) = timestamp {
106 evm_env.block_env.timestamp = timestamp;
107 } else {
108 evm_env.block_env.timestamp += 12;
109 }
110
111 if let Some(difficulty) = difficulty {
112 evm_env.block_env.difficulty = U256::from(difficulty);
113 }
114
115 evm_env.block_env.gas_limit = self.inner.eth_api.call_gas_limit();
117 if let Some(gas_limit) = gas_limit {
118 if gas_limit > evm_env.block_env.gas_limit {
119 return Err(
120 EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh).into()
121 )
122 }
123 evm_env.block_env.gas_limit = gas_limit;
124 }
125
126 if let Some(base_fee) = base_fee {
127 evm_env.block_env.basefee = base_fee.try_into().unwrap_or(u64::MAX);
128 }
129
130 let state_block_number = evm_env.block_env.number;
131 evm_env.block_env.number = block_number;
133
134 let eth_api = self.eth_api().clone();
135
136 self.eth_api()
137 .spawn_with_state_at_block(at, move |state| {
138 let coinbase = evm_env.block_env.beneficiary;
139 let basefee = evm_env.block_env.basefee;
140 let db = CacheDB::new(StateProviderDatabase::new(state));
141
142 let initial_coinbase = db
143 .basic_ref(coinbase)
144 .map_err(Eth::Error::from_eth_err)?
145 .map(|acc| acc.balance)
146 .unwrap_or_default();
147 let mut coinbase_balance_before_tx = initial_coinbase;
148 let mut coinbase_balance_after_tx = initial_coinbase;
149 let mut total_gas_used = 0u64;
150 let mut total_gas_fees = U256::ZERO;
151 let mut hasher = Keccak256::new();
152
153 let mut evm = eth_api.evm_config().evm_with_env(db, evm_env);
154
155 let mut results = Vec::with_capacity(transactions.len());
156 let mut transactions = transactions.into_iter().peekable();
157
158 while let Some(tx) = transactions.next() {
159 let signer = tx.signer();
160 let tx = {
161 let mut tx = <Eth::Pool as TransactionPool>::Transaction::from_pooled(tx);
162
163 if let EthBlobTransactionSidecar::Present(sidecar) = tx.take_blob() {
164 tx.validate_blob(&sidecar, EnvKzgSettings::Default.get()).map_err(
165 |e| {
166 Eth::Error::from_eth_err(EthApiError::InvalidParams(
167 e.to_string(),
168 ))
169 },
170 )?;
171 }
172
173 tx.into_consensus()
174 };
175
176 hasher.update(*tx.tx_hash());
177 let ResultAndState { result, state } = evm
178 .transact(eth_api.evm_config().tx_env(&tx))
179 .map_err(Eth::Error::from_evm_err)?;
180
181 let gas_price = tx
182 .effective_tip_per_gas(basefee)
183 .expect("fee is always valid; execution succeeded");
184 let gas_used = result.gas_used();
185 total_gas_used += gas_used;
186
187 let gas_fees = U256::from(gas_used) * U256::from(gas_price);
188 total_gas_fees += gas_fees;
189
190 coinbase_balance_after_tx =
192 state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
193 let coinbase_diff =
194 coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
195 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(gas_fees);
196
197 coinbase_balance_before_tx = coinbase_balance_after_tx;
199
200 let (value, revert) = if result.is_success() {
202 let value = result.into_output().unwrap_or_default();
203 (Some(value), None)
204 } else {
205 let revert = result.into_output().unwrap_or_default();
206 (None, Some(revert))
207 };
208
209 let tx_res = EthCallBundleTransactionResult {
210 coinbase_diff,
211 eth_sent_to_coinbase,
212 from_address: signer,
213 gas_fees,
214 gas_price: U256::from(gas_price),
215 gas_used,
216 to_address: tx.to(),
217 tx_hash: *tx.tx_hash(),
218 value,
219 revert,
220 };
221 results.push(tx_res);
222
223 if transactions.peek().is_some() {
226 evm.db_mut().commit(state)
229 }
230 }
231
232 let coinbase_diff = coinbase_balance_after_tx.saturating_sub(initial_coinbase);
235 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(total_gas_fees);
236 let bundle_gas_price =
237 coinbase_diff.checked_div(U256::from(total_gas_used)).unwrap_or_default();
238 let res = EthCallBundleResponse {
239 bundle_gas_price,
240 bundle_hash: hasher.finalize(),
241 coinbase_diff,
242 eth_sent_to_coinbase,
243 gas_fees: total_gas_fees,
244 results,
245 state_block_number,
246 total_gas_used,
247 };
248
249 Ok(res)
250 })
251 .await
252 }
253}
254
255#[async_trait::async_trait]
256impl<Eth> EthCallBundleApiServer for EthBundle<Eth>
257where
258 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
259{
260 async fn call_bundle(&self, request: EthCallBundle) -> RpcResult<EthCallBundleResponse> {
261 Self::call_bundle(self, request).await.map_err(Into::into)
262 }
263}
264
265#[derive(Debug)]
267struct EthBundleInner<Eth> {
268 eth_api: Eth,
270 #[allow(dead_code)]
272 blocking_task_guard: BlockingTaskGuard,
273}
274
275impl<Eth> std::fmt::Debug for EthBundle<Eth> {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 f.debug_struct("EthBundle").finish_non_exhaustive()
278 }
279}
280
281impl<Eth> Clone for EthBundle<Eth> {
282 fn clone(&self) -> Self {
283 Self { inner: Arc::clone(&self.inner) }
284 }
285}
286
287#[derive(Debug, thiserror::Error)]
289pub enum EthBundleError {
290 #[error("bundle missing txs")]
292 EmptyBundleTransactions,
293 #[error("bundle missing blockNumber")]
295 BundleMissingBlockNumber,
296 #[error("blob gas usage exceeds the limit of {MAX_DATA_GAS_PER_BLOCK} gas per block.")]
298 Eip4844BlobGasExceeded,
299}