use alloy_consensus::{BlockHeader, Transaction as _};
use alloy_primitives::{Keccak256, U256};
use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
use jsonrpsee::core::RpcResult;
use reth_chainspec::EthChainSpec;
use reth_evm::{env::EvmEnv, ConfigureEvm, ConfigureEvmEnv};
use reth_primitives_traits::SignedTransaction;
use reth_provider::{ChainSpecProvider, HeaderProvider};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_eth_api::{
helpers::{Call, EthTransactions, LoadPendingBlock},
EthCallBundleApiServer, FromEthApiError, FromEvmError, RpcNodeCore,
};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError, RpcInvalidTransactionError};
use reth_tasks::pool::BlockingTaskGuard;
use reth_transaction_pool::{
EthBlobTransactionSidecar, EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionPool,
};
use revm::{
db::{CacheDB, DatabaseCommit, DatabaseRef},
primitives::{ResultAndState, TxEnv},
};
use revm_primitives::{EnvKzgSettings, EnvWithHandlerCfg, SpecId, MAX_BLOB_GAS_PER_BLOCK};
use std::sync::Arc;
pub struct EthBundle<Eth> {
inner: Arc<EthBundleInner<Eth>>,
}
impl<Eth> EthBundle<Eth> {
pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
Self { inner: Arc::new(EthBundleInner { eth_api, blocking_task_guard }) }
}
pub fn eth_api(&self) -> &Eth {
&self.inner.eth_api
}
}
impl<Eth> EthBundle<Eth>
where
Eth: EthTransactions + LoadPendingBlock + Call + 'static,
{
pub async fn call_bundle(
&self,
bundle: EthCallBundle,
) -> Result<EthCallBundleResponse, Eth::Error> {
let EthCallBundle {
txs,
block_number,
coinbase,
state_block_number,
timeout: _,
timestamp,
gas_limit,
difficulty,
base_fee,
..
} = bundle;
if txs.is_empty() {
return Err(EthApiError::InvalidParams(
EthBundleError::EmptyBundleTransactions.to_string(),
)
.into())
}
if block_number == 0 {
return Err(EthApiError::InvalidParams(
EthBundleError::BundleMissingBlockNumber.to_string(),
)
.into())
}
let transactions = txs
.into_iter()
.map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.collect::<Vec<_>>();
if transactions.iter().filter_map(|tx| tx.blob_gas_used()).sum::<u64>() >
MAX_BLOB_GAS_PER_BLOCK
{
return Err(EthApiError::InvalidParams(
EthBundleError::Eip4844BlobGasExceeded.to_string(),
)
.into())
}
let block_id: alloy_rpc_types_eth::BlockId = state_block_number.into();
let (evm_env, at) = self.eth_api().evm_env_at(block_id).await?;
let EvmEnv { cfg_env_with_handler_cfg, mut block_env } = evm_env;
if let Some(coinbase) = coinbase {
block_env.coinbase = coinbase;
}
if let Some(timestamp) = timestamp {
block_env.timestamp = U256::from(timestamp);
} else {
block_env.timestamp += U256::from(12);
}
if let Some(difficulty) = difficulty {
block_env.difficulty = U256::from(difficulty);
}
block_env.gas_limit = U256::from(self.inner.eth_api.call_gas_limit());
if let Some(gas_limit) = gas_limit {
let gas_limit = U256::from(gas_limit);
if gas_limit > block_env.gas_limit {
return Err(
EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh).into()
)
}
block_env.gas_limit = gas_limit;
}
if let Some(base_fee) = base_fee {
block_env.basefee = U256::from(base_fee);
} else if cfg_env_with_handler_cfg.handler_cfg.spec_id.is_enabled_in(SpecId::LONDON) {
let parent_block = block_env.number.saturating_to::<u64>();
let parent = RpcNodeCore::provider(self.eth_api())
.header_by_number(parent_block)
.map_err(Eth::Error::from_eth_err)?
.ok_or(EthApiError::HeaderNotFound(parent_block.into()))?;
if let Some(base_fee) = parent.next_block_base_fee(
RpcNodeCore::provider(self.eth_api())
.chain_spec()
.base_fee_params_at_block(parent_block),
) {
block_env.basefee = U256::from(base_fee);
}
}
let state_block_number = block_env.number;
block_env.number = U256::from(block_number);
let eth_api = self.eth_api().clone();
self.eth_api()
.spawn_with_state_at_block(at, move |state| {
let coinbase = block_env.coinbase;
let basefee = Some(block_env.basefee.to::<u64>());
let env = EnvWithHandlerCfg::new_with_cfg_env(
cfg_env_with_handler_cfg,
block_env,
TxEnv::default(),
);
let db = CacheDB::new(StateProviderDatabase::new(state));
let initial_coinbase = db
.basic_ref(coinbase)
.map_err(Eth::Error::from_eth_err)?
.map(|acc| acc.balance)
.unwrap_or_default();
let mut coinbase_balance_before_tx = initial_coinbase;
let mut coinbase_balance_after_tx = initial_coinbase;
let mut total_gas_used = 0u64;
let mut total_gas_fess = U256::ZERO;
let mut hasher = Keccak256::new();
let mut evm = eth_api.evm_config().evm_with_env(db, env);
let mut results = Vec::with_capacity(transactions.len());
let mut transactions = transactions.into_iter().peekable();
while let Some(tx) = transactions.next() {
let signer = tx.signer();
let tx = {
let mut tx: <Eth::Pool as TransactionPool>::Transaction = tx.into();
if let EthBlobTransactionSidecar::Present(sidecar) = tx.take_blob() {
tx.validate_blob(&sidecar, EnvKzgSettings::Default.get()).map_err(
|e| {
Eth::Error::from_eth_err(EthApiError::InvalidParams(
e.to_string(),
))
},
)?;
}
tx.into_consensus()
};
hasher.update(*tx.tx_hash());
let gas_price = tx.effective_gas_price(basefee);
eth_api.evm_config().fill_tx_env(evm.tx_mut(), &tx, signer);
let ResultAndState { result, state } =
evm.transact().map_err(Eth::Error::from_evm_err)?;
let gas_used = result.gas_used();
total_gas_used += gas_used;
let gas_fees = U256::from(gas_used) * U256::from(gas_price);
total_gas_fess += gas_fees;
coinbase_balance_after_tx =
state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
let coinbase_diff =
coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
let eth_sent_to_coinbase = coinbase_diff.saturating_sub(gas_fees);
coinbase_balance_before_tx = coinbase_balance_after_tx;
let (value, revert) = if result.is_success() {
let value = result.into_output().unwrap_or_default();
(Some(value), None)
} else {
let revert = result.into_output().unwrap_or_default();
(None, Some(revert))
};
let tx_res = EthCallBundleTransactionResult {
coinbase_diff,
eth_sent_to_coinbase,
from_address: signer,
gas_fees,
gas_price: U256::from(gas_price),
gas_used,
to_address: tx.to(),
tx_hash: *tx.tx_hash(),
value,
revert,
};
results.push(tx_res);
if transactions.peek().is_some() {
evm.context.evm.db.commit(state)
}
}
let coinbase_diff = coinbase_balance_after_tx.saturating_sub(initial_coinbase);
let eth_sent_to_coinbase = coinbase_diff.saturating_sub(total_gas_fess);
let bundle_gas_price =
coinbase_diff.checked_div(U256::from(total_gas_used)).unwrap_or_default();
let res = EthCallBundleResponse {
bundle_gas_price,
bundle_hash: hasher.finalize(),
coinbase_diff,
eth_sent_to_coinbase,
gas_fees: total_gas_fess,
results,
state_block_number: state_block_number.to(),
total_gas_used,
};
Ok(res)
})
.await
}
}
#[async_trait::async_trait]
impl<Eth> EthCallBundleApiServer for EthBundle<Eth>
where
Eth: EthTransactions + LoadPendingBlock + Call + 'static,
{
async fn call_bundle(&self, request: EthCallBundle) -> RpcResult<EthCallBundleResponse> {
Self::call_bundle(self, request).await.map_err(Into::into)
}
}
#[derive(Debug)]
struct EthBundleInner<Eth> {
eth_api: Eth,
#[allow(dead_code)]
blocking_task_guard: BlockingTaskGuard,
}
impl<Eth> std::fmt::Debug for EthBundle<Eth> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthBundle").finish_non_exhaustive()
}
}
impl<Eth> Clone for EthBundle<Eth> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
#[derive(Debug, thiserror::Error)]
pub enum EthBundleError {
#[error("bundle missing txs")]
EmptyBundleTransactions,
#[error("bundle missing blockNumber")]
BundleMissingBlockNumber,
#[error("blob gas usage exceeds the limit of {MAX_BLOB_GAS_PER_BLOCK} gas per block.")]
Eip4844BlobGasExceeded,
}