reth_rpc/eth/
bundle.rs
1use alloy_consensus::{EnvKzgSettings, Transaction as _};
4use alloy_eips::eip7840::BlobParams;
5use alloy_primitives::{Keccak256, U256};
6use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
7use jsonrpsee::core::RpcResult;
8use reth_chainspec::{ChainSpecProvider, EthChainSpec};
9use reth_evm::{ConfigureEvm, Evm};
10use reth_primitives_traits::SignedTransaction;
11use reth_revm::{database::StateProviderDatabase, db::CacheDB};
12use reth_rpc_eth_api::{
13 helpers::{Call, EthTransactions, LoadPendingBlock},
14 EthCallBundleApiServer, FromEthApiError, FromEvmError,
15};
16use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError, RpcInvalidTransactionError};
17use reth_tasks::pool::BlockingTaskGuard;
18use reth_transaction_pool::{
19 EthBlobTransactionSidecar, EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionPool,
20};
21use revm::{context_interface::result::ResultAndState, DatabaseCommit, DatabaseRef};
22use std::sync::Arc;
23
24pub struct EthBundle<Eth> {
26 inner: Arc<EthBundleInner<Eth>>,
28}
29
30impl<Eth> EthBundle<Eth> {
31 pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
33 Self { inner: Arc::new(EthBundleInner { eth_api, blocking_task_guard }) }
34 }
35
36 pub fn eth_api(&self) -> &Eth {
38 &self.inner.eth_api
39 }
40}
41
42impl<Eth> EthBundle<Eth>
43where
44 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
45{
46 pub async fn call_bundle(
51 &self,
52 bundle: EthCallBundle,
53 ) -> Result<EthCallBundleResponse, Eth::Error> {
54 let EthCallBundle {
55 txs,
56 block_number,
57 coinbase,
58 state_block_number,
59 timeout: _,
60 timestamp,
61 gas_limit,
62 difficulty,
63 base_fee,
64 ..
65 } = bundle;
66 if txs.is_empty() {
67 return Err(EthApiError::InvalidParams(
68 EthBundleError::EmptyBundleTransactions.to_string(),
69 )
70 .into())
71 }
72 if block_number == 0 {
73 return Err(EthApiError::InvalidParams(
74 EthBundleError::BundleMissingBlockNumber.to_string(),
75 )
76 .into())
77 }
78
79 let transactions = txs
80 .into_iter()
81 .map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
82 .collect::<Result<Vec<_>, _>>()?
83 .into_iter()
84 .collect::<Vec<_>>();
85
86 let block_id: alloy_rpc_types_eth::BlockId = state_block_number.into();
87 let (mut evm_env, at) = self.eth_api().evm_env_at(block_id).await?;
89
90 if let Some(coinbase) = coinbase {
91 evm_env.block_env.beneficiary = coinbase;
92 }
93
94 if let Some(timestamp) = timestamp {
96 evm_env.block_env.timestamp = timestamp;
97 } else {
98 evm_env.block_env.timestamp += 12;
99 }
100
101 if let Some(difficulty) = difficulty {
102 evm_env.block_env.difficulty = U256::from(difficulty);
103 }
104
105 let blob_gas_used = transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>();
108 if blob_gas_used > 0 {
109 let blob_params = self
110 .eth_api()
111 .provider()
112 .chain_spec()
113 .blob_params_at_timestamp(evm_env.block_env.timestamp)
114 .unwrap_or_else(BlobParams::cancun);
115 if transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>() >
116 blob_params.max_blob_gas_per_block()
117 {
118 return Err(EthApiError::InvalidParams(
119 EthBundleError::Eip4844BlobGasExceeded(blob_params.max_blob_gas_per_block())
120 .to_string(),
121 )
122 .into())
123 }
124 }
125
126 evm_env.block_env.gas_limit = self.inner.eth_api.call_gas_limit();
128 if let Some(gas_limit) = gas_limit {
129 if gas_limit > evm_env.block_env.gas_limit {
130 return Err(
131 EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh).into()
132 )
133 }
134 evm_env.block_env.gas_limit = gas_limit;
135 }
136
137 if let Some(base_fee) = base_fee {
138 evm_env.block_env.basefee = base_fee.try_into().unwrap_or(u64::MAX);
139 }
140
141 let state_block_number = evm_env.block_env.number;
142 evm_env.block_env.number = block_number;
144
145 let eth_api = self.eth_api().clone();
146
147 self.eth_api()
148 .spawn_with_state_at_block(at, move |state| {
149 let coinbase = evm_env.block_env.beneficiary;
150 let basefee = evm_env.block_env.basefee;
151 let db = CacheDB::new(StateProviderDatabase::new(state));
152
153 let initial_coinbase = db
154 .basic_ref(coinbase)
155 .map_err(Eth::Error::from_eth_err)?
156 .map(|acc| acc.balance)
157 .unwrap_or_default();
158 let mut coinbase_balance_before_tx = initial_coinbase;
159 let mut coinbase_balance_after_tx = initial_coinbase;
160 let mut total_gas_used = 0u64;
161 let mut total_gas_fees = U256::ZERO;
162 let mut hasher = Keccak256::new();
163
164 let mut evm = eth_api.evm_config().evm_with_env(db, evm_env);
165
166 let mut results = Vec::with_capacity(transactions.len());
167 let mut transactions = transactions.into_iter().peekable();
168
169 while let Some(tx) = transactions.next() {
170 let signer = tx.signer();
171 let tx = {
172 let mut tx = <Eth::Pool as TransactionPool>::Transaction::from_pooled(tx);
173
174 if let EthBlobTransactionSidecar::Present(sidecar) = tx.take_blob() {
175 tx.validate_blob(&sidecar, EnvKzgSettings::Default.get()).map_err(
176 |e| {
177 Eth::Error::from_eth_err(EthApiError::InvalidParams(
178 e.to_string(),
179 ))
180 },
181 )?;
182 }
183
184 tx.into_consensus()
185 };
186
187 hasher.update(*tx.tx_hash());
188 let ResultAndState { result, state } = evm
189 .transact(eth_api.evm_config().tx_env(&tx))
190 .map_err(Eth::Error::from_evm_err)?;
191
192 let gas_price = tx
193 .effective_tip_per_gas(basefee)
194 .expect("fee is always valid; execution succeeded");
195 let gas_used = result.gas_used();
196 total_gas_used += gas_used;
197
198 let gas_fees = U256::from(gas_used) * U256::from(gas_price);
199 total_gas_fees += gas_fees;
200
201 coinbase_balance_after_tx =
203 state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
204 let coinbase_diff =
205 coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
206 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(gas_fees);
207
208 coinbase_balance_before_tx = coinbase_balance_after_tx;
210
211 let (value, revert) = if result.is_success() {
213 let value = result.into_output().unwrap_or_default();
214 (Some(value), None)
215 } else {
216 let revert = result.into_output().unwrap_or_default();
217 (None, Some(revert))
218 };
219
220 let tx_res = EthCallBundleTransactionResult {
221 coinbase_diff,
222 eth_sent_to_coinbase,
223 from_address: signer,
224 gas_fees,
225 gas_price: U256::from(gas_price),
226 gas_used,
227 to_address: tx.to(),
228 tx_hash: *tx.tx_hash(),
229 value,
230 revert,
231 };
232 results.push(tx_res);
233
234 if transactions.peek().is_some() {
237 evm.db_mut().commit(state)
240 }
241 }
242
243 let coinbase_diff = coinbase_balance_after_tx.saturating_sub(initial_coinbase);
246 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(total_gas_fees);
247 let bundle_gas_price =
248 coinbase_diff.checked_div(U256::from(total_gas_used)).unwrap_or_default();
249 let res = EthCallBundleResponse {
250 bundle_gas_price,
251 bundle_hash: hasher.finalize(),
252 coinbase_diff,
253 eth_sent_to_coinbase,
254 gas_fees: total_gas_fees,
255 results,
256 state_block_number,
257 total_gas_used,
258 };
259
260 Ok(res)
261 })
262 .await
263 }
264}
265
266#[async_trait::async_trait]
267impl<Eth> EthCallBundleApiServer for EthBundle<Eth>
268where
269 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
270{
271 async fn call_bundle(&self, request: EthCallBundle) -> RpcResult<EthCallBundleResponse> {
272 Self::call_bundle(self, request).await.map_err(Into::into)
273 }
274}
275
276#[derive(Debug)]
278struct EthBundleInner<Eth> {
279 eth_api: Eth,
281 #[expect(dead_code)]
283 blocking_task_guard: BlockingTaskGuard,
284}
285
286impl<Eth> std::fmt::Debug for EthBundle<Eth> {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 f.debug_struct("EthBundle").finish_non_exhaustive()
289 }
290}
291
292impl<Eth> Clone for EthBundle<Eth> {
293 fn clone(&self) -> Self {
294 Self { inner: Arc::clone(&self.inner) }
295 }
296}
297
298#[derive(Debug, thiserror::Error)]
300pub enum EthBundleError {
301 #[error("bundle missing txs")]
303 EmptyBundleTransactions,
304 #[error("bundle missing blockNumber")]
306 BundleMissingBlockNumber,
307 #[error("blob gas usage exceeds the limit of {0} gas per block.")]
309 Eip4844BlobGasExceeded(u64),
310}