1use alloy_consensus::{transaction::TxHashRef, EnvKzgSettings, Transaction as _};
4use alloy_eips::eip7840::BlobParams;
5use alloy_evm::env::BlockEnvironment;
6use alloy_primitives::{uint, Keccak256, U256};
7use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
8use jsonrpsee::core::RpcResult;
9use reth_chainspec::{ChainSpecProvider, EthChainSpec};
10use reth_evm::{ConfigureEvm, Evm};
11use reth_rpc_eth_api::{
12 helpers::{Call, EthTransactions, LoadPendingBlock},
13 EthCallBundleApiServer, FromEthApiError, FromEvmError,
14};
15use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError, RpcInvalidTransactionError};
16use reth_tasks::pool::BlockingTaskGuard;
17use reth_transaction_pool::{
18 EthBlobTransactionSidecar, EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionPool,
19};
20use revm::{
21 context::Block, context_interface::result::ResultAndState, DatabaseCommit, DatabaseRef,
22};
23use std::sync::Arc;
24
25pub struct EthBundle<Eth> {
27 inner: Arc<EthBundleInner<Eth>>,
29}
30
31impl<Eth> EthBundle<Eth> {
32 pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
34 Self { inner: Arc::new(EthBundleInner { eth_api, blocking_task_guard }) }
35 }
36
37 pub fn eth_api(&self) -> &Eth {
39 &self.inner.eth_api
40 }
41}
42
43impl<Eth> EthBundle<Eth>
44where
45 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
46{
47 pub async fn call_bundle(
52 &self,
53 bundle: EthCallBundle,
54 ) -> Result<EthCallBundleResponse, Eth::Error> {
55 let EthCallBundle {
56 txs,
57 block_number,
58 coinbase,
59 state_block_number,
60 timeout: _,
61 timestamp,
62 gas_limit,
63 difficulty,
64 base_fee,
65 ..
66 } = bundle;
67 if txs.is_empty() {
68 return Err(EthApiError::InvalidParams(
69 EthBundleError::EmptyBundleTransactions.to_string(),
70 )
71 .into())
72 }
73 if block_number == 0 {
74 return Err(EthApiError::InvalidParams(
75 EthBundleError::BundleMissingBlockNumber.to_string(),
76 )
77 .into())
78 }
79
80 let transactions = txs
81 .into_iter()
82 .map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
83 .collect::<Result<Vec<_>, _>>()?
84 .into_iter()
85 .collect::<Vec<_>>();
86
87 let block_id: alloy_rpc_types_eth::BlockId = state_block_number.into();
88 let (mut evm_env, at) = self.eth_api().evm_env_at(block_id).await?;
90
91 if let Some(coinbase) = coinbase {
92 evm_env.block_env.inner_mut().beneficiary = coinbase;
93 }
94
95 if let Some(timestamp) = timestamp {
97 evm_env.block_env.inner_mut().timestamp = U256::from(timestamp);
98 } else {
99 evm_env.block_env.inner_mut().timestamp += uint!(12_U256);
100 }
101
102 if let Some(difficulty) = difficulty {
103 evm_env.block_env.inner_mut().difficulty = U256::from(difficulty);
104 }
105
106 let blob_gas_used = transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>();
109 if blob_gas_used > 0 {
110 let blob_params = self
111 .eth_api()
112 .provider()
113 .chain_spec()
114 .blob_params_at_timestamp(evm_env.block_env.timestamp().saturating_to())
115 .unwrap_or_else(BlobParams::cancun);
116 if transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>() >
117 blob_params.max_blob_gas_per_block()
118 {
119 return Err(EthApiError::InvalidParams(
120 EthBundleError::Eip4844BlobGasExceeded(blob_params.max_blob_gas_per_block())
121 .to_string(),
122 )
123 .into())
124 }
125 }
126
127 evm_env.block_env.inner_mut().gas_limit = self.inner.eth_api.call_gas_limit();
129 if let Some(gas_limit) = gas_limit {
130 if gas_limit > evm_env.block_env.gas_limit() {
131 return Err(
132 EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh).into()
133 )
134 }
135 evm_env.block_env.inner_mut().gas_limit = gas_limit;
136 }
137
138 if let Some(base_fee) = base_fee {
139 evm_env.block_env.inner_mut().basefee = base_fee.try_into().unwrap_or(u64::MAX);
140 }
141
142 let state_block_number = evm_env.block_env.number();
143 evm_env.block_env.inner_mut().number = U256::from(block_number);
145
146 self.eth_api()
147 .spawn_with_state_at_block(at, move |eth_api, db| {
148 let coinbase = evm_env.block_env.beneficiary();
149 let basefee = evm_env.block_env.basefee();
150
151 let initial_coinbase = db
152 .basic_ref(coinbase)
153 .map_err(Eth::Error::from_eth_err)?
154 .map(|acc| acc.balance)
155 .unwrap_or_default();
156 let mut coinbase_balance_before_tx = initial_coinbase;
157 let mut coinbase_balance_after_tx = initial_coinbase;
158 let mut total_gas_used = 0u64;
159 let mut total_gas_fees = U256::ZERO;
160 let mut hasher = Keccak256::new();
161
162 let mut evm = eth_api.evm_config().evm_with_env(db, evm_env);
163
164 let mut results = Vec::with_capacity(transactions.len());
165 let mut transactions = transactions.into_iter().peekable();
166
167 while let Some(tx) = transactions.next() {
168 let signer = tx.signer();
169 let tx = {
170 let mut tx = <Eth::Pool as TransactionPool>::Transaction::from_pooled(tx);
171
172 if let EthBlobTransactionSidecar::Present(sidecar) = tx.take_blob() {
173 tx.validate_blob(&sidecar, EnvKzgSettings::Default.get()).map_err(
174 |e| {
175 Eth::Error::from_eth_err(EthApiError::InvalidParams(
176 e.to_string(),
177 ))
178 },
179 )?;
180 }
181
182 tx.into_consensus()
183 };
184
185 hasher.update(*tx.tx_hash());
186 let ResultAndState { result, state } = evm
187 .transact(eth_api.evm_config().tx_env(&tx))
188 .map_err(Eth::Error::from_evm_err)?;
189
190 let gas_price = tx
191 .effective_tip_per_gas(basefee)
192 .expect("fee is always valid; execution succeeded");
193 let gas_used = result.gas_used();
194 total_gas_used += gas_used;
195
196 let gas_fees = U256::from(gas_used) * U256::from(gas_price);
197 total_gas_fees += gas_fees;
198
199 coinbase_balance_after_tx =
201 state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
202 let coinbase_diff =
203 coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
204 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(gas_fees);
205
206 coinbase_balance_before_tx = coinbase_balance_after_tx;
208
209 let (value, revert) = if result.is_success() {
211 let value = result.into_output().unwrap_or_default();
212 (Some(value), None)
213 } else {
214 let revert = result.into_output().unwrap_or_default();
215 (None, Some(revert))
216 };
217
218 let tx_res = EthCallBundleTransactionResult {
219 coinbase_diff,
220 eth_sent_to_coinbase,
221 from_address: signer,
222 gas_fees,
223 gas_price: U256::from(gas_price),
224 gas_used,
225 to_address: tx.to(),
226 tx_hash: *tx.tx_hash(),
227 value,
228 revert,
229 };
230 results.push(tx_res);
231
232 if transactions.peek().is_some() {
235 evm.db_mut().commit(state)
238 }
239 }
240
241 let coinbase_diff = coinbase_balance_after_tx.saturating_sub(initial_coinbase);
244 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(total_gas_fees);
245 let bundle_gas_price =
246 coinbase_diff.checked_div(U256::from(total_gas_used)).unwrap_or_default();
247 let res = EthCallBundleResponse {
248 bundle_gas_price,
249 bundle_hash: hasher.finalize(),
250 coinbase_diff,
251 eth_sent_to_coinbase,
252 gas_fees: total_gas_fees,
253 results,
254 state_block_number: state_block_number.to(),
255 total_gas_used,
256 };
257
258 Ok(res)
259 })
260 .await
261 }
262}
263
264#[async_trait::async_trait]
265impl<Eth> EthCallBundleApiServer for EthBundle<Eth>
266where
267 Eth: EthTransactions + LoadPendingBlock + Call + 'static,
268{
269 async fn call_bundle(&self, request: EthCallBundle) -> RpcResult<EthCallBundleResponse> {
270 Self::call_bundle(self, request).await.map_err(Into::into)
271 }
272}
273
274#[derive(Debug)]
276struct EthBundleInner<Eth> {
277 eth_api: Eth,
279 #[expect(dead_code)]
281 blocking_task_guard: BlockingTaskGuard,
282}
283
284impl<Eth> std::fmt::Debug for EthBundle<Eth> {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 f.debug_struct("EthBundle").finish_non_exhaustive()
287 }
288}
289
290impl<Eth> Clone for EthBundle<Eth> {
291 fn clone(&self) -> Self {
292 Self { inner: Arc::clone(&self.inner) }
293 }
294}
295
296#[derive(Debug, thiserror::Error)]
298pub enum EthBundleError {
299 #[error("bundle missing txs")]
301 EmptyBundleTransactions,
302 #[error("bundle missing blockNumber")]
304 BundleMissingBlockNumber,
305 #[error("blob gas usage exceeds the limit of {0} gas per block.")]
307 Eip4844BlobGasExceeded(u64),
308}