1use alloy_consensus::{transaction::TxHashRef, EnvKzgSettings, Transaction as _};
4use alloy_eips::eip7840::BlobParams;
5use alloy_evm::env::BlockEnvironment;
6use alloy_primitives::{uint, Keccak256, U256};
7use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
8use jsonrpsee::core::RpcResult;
9use reth_chainspec::{ChainSpecProvider, EthChainSpec};
10use reth_evm::{ConfigureEvm, Evm};
11use reth_revm::{database::StateProviderDatabase, State};
12use reth_rpc_eth_api::{
13 helpers::{Call, EthTransactions, LoadPendingBlock},
14 EthCallBundleApiServer, FromEthApiError, FromEvmError,
15};
16use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError, RpcInvalidTransactionError};
17use reth_tasks::pool::BlockingTaskGuard;
18use reth_transaction_pool::{
19 EthBlobTransactionSidecar, EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionPool,
20};
21use revm::{
22 context::Block, context_interface::result::ResultAndState, DatabaseCommit, DatabaseRef,
23};
24use std::sync::Arc;
25
26pub struct EthBundle<Eth> {
28 inner: Arc<EthBundleInner<Eth>>,
30}
31
32impl<Eth> EthBundle<Eth> {
33 pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
35 Self { inner: Arc::new(EthBundleInner { eth_api, blocking_task_guard }) }
36 }
37
38 pub fn eth_api(&self) -> &Eth {
40 &self.inner.eth_api
41 }
42}
43
44impl<Eth> EthBundle<Eth>
45where
46 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
47{
48 pub async fn call_bundle(
53 &self,
54 bundle: EthCallBundle,
55 ) -> Result<EthCallBundleResponse, Eth::Error> {
56 let EthCallBundle {
57 txs,
58 block_number,
59 coinbase,
60 state_block_number,
61 timeout: _,
62 timestamp,
63 gas_limit,
64 difficulty,
65 base_fee,
66 ..
67 } = bundle;
68 if txs.is_empty() {
69 return Err(EthApiError::InvalidParams(
70 EthBundleError::EmptyBundleTransactions.to_string(),
71 )
72 .into())
73 }
74 if block_number == 0 {
75 return Err(EthApiError::InvalidParams(
76 EthBundleError::BundleMissingBlockNumber.to_string(),
77 )
78 .into())
79 }
80
81 let transactions = txs
82 .into_iter()
83 .map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
84 .collect::<Result<Vec<_>, _>>()?
85 .into_iter()
86 .collect::<Vec<_>>();
87
88 let block_id: alloy_rpc_types_eth::BlockId = state_block_number.into();
89 let (mut evm_env, at) = self.eth_api().evm_env_at(block_id).await?;
91
92 if let Some(coinbase) = coinbase {
93 evm_env.block_env.inner_mut().beneficiary = coinbase;
94 }
95
96 if let Some(timestamp) = timestamp {
98 evm_env.block_env.inner_mut().timestamp = U256::from(timestamp);
99 } else {
100 evm_env.block_env.inner_mut().timestamp += uint!(12_U256);
101 }
102
103 if let Some(difficulty) = difficulty {
104 evm_env.block_env.inner_mut().difficulty = U256::from(difficulty);
105 }
106
107 let blob_gas_used = transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>();
110 if blob_gas_used > 0 {
111 let blob_params = self
112 .eth_api()
113 .provider()
114 .chain_spec()
115 .blob_params_at_timestamp(evm_env.block_env.timestamp().saturating_to())
116 .unwrap_or_else(BlobParams::cancun);
117 if transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>() >
118 blob_params.max_blob_gas_per_block()
119 {
120 return Err(EthApiError::InvalidParams(
121 EthBundleError::Eip4844BlobGasExceeded(blob_params.max_blob_gas_per_block())
122 .to_string(),
123 )
124 .into())
125 }
126 }
127
128 evm_env.block_env.inner_mut().gas_limit = self.inner.eth_api.call_gas_limit();
130 if let Some(gas_limit) = gas_limit {
131 if gas_limit > evm_env.block_env.gas_limit() {
132 return Err(
133 EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh).into()
134 )
135 }
136 evm_env.block_env.inner_mut().gas_limit = gas_limit;
137 }
138
139 if let Some(base_fee) = base_fee {
140 evm_env.block_env.inner_mut().basefee = base_fee.try_into().unwrap_or(u64::MAX);
141 }
142
143 let state_block_number = evm_env.block_env.number();
144 evm_env.block_env.inner_mut().number = U256::from(block_number);
146
147 let eth_api = self.eth_api().clone();
148
149 self.eth_api()
150 .spawn_with_state_at_block(at, move |state| {
151 let coinbase = evm_env.block_env.beneficiary();
152 let basefee = evm_env.block_env.basefee();
153 let db = State::builder().with_database(StateProviderDatabase::new(state)).build();
154
155 let initial_coinbase = db
156 .basic_ref(coinbase)
157 .map_err(Eth::Error::from_eth_err)?
158 .map(|acc| acc.balance)
159 .unwrap_or_default();
160 let mut coinbase_balance_before_tx = initial_coinbase;
161 let mut coinbase_balance_after_tx = initial_coinbase;
162 let mut total_gas_used = 0u64;
163 let mut total_gas_fees = U256::ZERO;
164 let mut hasher = Keccak256::new();
165
166 let mut evm = eth_api.evm_config().evm_with_env(db, evm_env);
167
168 let mut results = Vec::with_capacity(transactions.len());
169 let mut transactions = transactions.into_iter().peekable();
170
171 while let Some(tx) = transactions.next() {
172 let signer = tx.signer();
173 let tx = {
174 let mut tx = <Eth::Pool as TransactionPool>::Transaction::from_pooled(tx);
175
176 if let EthBlobTransactionSidecar::Present(sidecar) = tx.take_blob() {
177 tx.validate_blob(&sidecar, EnvKzgSettings::Default.get()).map_err(
178 |e| {
179 Eth::Error::from_eth_err(EthApiError::InvalidParams(
180 e.to_string(),
181 ))
182 },
183 )?;
184 }
185
186 tx.into_consensus()
187 };
188
189 hasher.update(*tx.tx_hash());
190 let ResultAndState { result, state } = evm
191 .transact(eth_api.evm_config().tx_env(&tx))
192 .map_err(Eth::Error::from_evm_err)?;
193
194 let gas_price = tx
195 .effective_tip_per_gas(basefee)
196 .expect("fee is always valid; execution succeeded");
197 let gas_used = result.gas_used();
198 total_gas_used += gas_used;
199
200 let gas_fees = U256::from(gas_used) * U256::from(gas_price);
201 total_gas_fees += gas_fees;
202
203 coinbase_balance_after_tx =
205 state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
206 let coinbase_diff =
207 coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
208 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(gas_fees);
209
210 coinbase_balance_before_tx = coinbase_balance_after_tx;
212
213 let (value, revert) = if result.is_success() {
215 let value = result.into_output().unwrap_or_default();
216 (Some(value), None)
217 } else {
218 let revert = result.into_output().unwrap_or_default();
219 (None, Some(revert))
220 };
221
222 let tx_res = EthCallBundleTransactionResult {
223 coinbase_diff,
224 eth_sent_to_coinbase,
225 from_address: signer,
226 gas_fees,
227 gas_price: U256::from(gas_price),
228 gas_used,
229 to_address: tx.to(),
230 tx_hash: *tx.tx_hash(),
231 value,
232 revert,
233 };
234 results.push(tx_res);
235
236 if transactions.peek().is_some() {
239 evm.db_mut().commit(state)
242 }
243 }
244
245 let coinbase_diff = coinbase_balance_after_tx.saturating_sub(initial_coinbase);
248 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(total_gas_fees);
249 let bundle_gas_price =
250 coinbase_diff.checked_div(U256::from(total_gas_used)).unwrap_or_default();
251 let res = EthCallBundleResponse {
252 bundle_gas_price,
253 bundle_hash: hasher.finalize(),
254 coinbase_diff,
255 eth_sent_to_coinbase,
256 gas_fees: total_gas_fees,
257 results,
258 state_block_number: state_block_number.to(),
259 total_gas_used,
260 };
261
262 Ok(res)
263 })
264 .await
265 }
266}
267
268#[async_trait::async_trait]
269impl<Eth> EthCallBundleApiServer for EthBundle<Eth>
270where
271 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
272{
273 async fn call_bundle(&self, request: EthCallBundle) -> RpcResult<EthCallBundleResponse> {
274 Self::call_bundle(self, request).await.map_err(Into::into)
275 }
276}
277
278#[derive(Debug)]
280struct EthBundleInner<Eth> {
281 eth_api: Eth,
283 #[expect(dead_code)]
285 blocking_task_guard: BlockingTaskGuard,
286}
287
288impl<Eth> std::fmt::Debug for EthBundle<Eth> {
289 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290 f.debug_struct("EthBundle").finish_non_exhaustive()
291 }
292}
293
294impl<Eth> Clone for EthBundle<Eth> {
295 fn clone(&self) -> Self {
296 Self { inner: Arc::clone(&self.inner) }
297 }
298}
299
300#[derive(Debug, thiserror::Error)]
302pub enum EthBundleError {
303 #[error("bundle missing txs")]
305 EmptyBundleTransactions,
306 #[error("bundle missing blockNumber")]
308 BundleMissingBlockNumber,
309 #[error("blob gas usage exceeds the limit of {0} gas per block.")]
311 Eip4844BlobGasExceeded(u64),
312}