use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::U256;
use alloy_rpc_types_eth::BlockId;
use alloy_rpc_types_mev::{
BundleItem, Inclusion, Privacy, RefundConfig, SendBundleRequest, SimBundleLogs,
SimBundleOverrides, SimBundleResponse, Validity,
};
use jsonrpsee::core::RpcResult;
use reth_chainspec::EthChainSpec;
use reth_evm::{env::EvmEnv, ConfigureEvm, ConfigureEvmEnv};
use reth_provider::{ChainSpecProvider, HeaderProvider, ProviderTx};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_api::MevSimApiServer;
use reth_rpc_eth_api::{
helpers::{Call, EthTransactions, LoadPendingBlock},
FromEthApiError, RpcNodeCore,
};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_tasks::pool::BlockingTaskGuard;
use reth_transaction_pool::{PoolConsensusTx, PoolPooledTx, PoolTransaction, TransactionPool};
use revm::{
db::CacheDB,
primitives::{Address, EnvWithHandlerCfg, ResultAndState, SpecId, TxEnv},
DatabaseCommit, DatabaseRef,
};
use std::{sync::Arc, time::Duration};
use tracing::info;
const MAX_NESTED_BUNDLE_DEPTH: usize = 5;
const MAX_BUNDLE_BODY_SIZE: usize = 50;
const DEFAULT_SIM_TIMEOUT: Duration = Duration::from_secs(5);
const MAX_SIM_TIMEOUT: Duration = Duration::from_secs(30);
const SBUNDLE_PAYOUT_MAX_COST: u64 = 30_000;
#[derive(Clone, Debug)]
pub struct FlattenedBundleItem<T> {
pub tx: T,
pub signer: Address,
pub can_revert: bool,
pub inclusion: Inclusion,
pub validity: Option<Validity>,
pub privacy: Option<Privacy>,
pub refund_percent: Option<u64>,
pub refund_configs: Option<Vec<RefundConfig>>,
}
pub struct EthSimBundle<Eth> {
inner: Arc<EthSimBundleInner<Eth>>,
}
impl<Eth> EthSimBundle<Eth> {
pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
Self { inner: Arc::new(EthSimBundleInner { eth_api, blocking_task_guard }) }
}
pub fn eth_api(&self) -> &Eth {
&self.inner.eth_api
}
}
impl<Eth> EthSimBundle<Eth>
where
Eth: EthTransactions + LoadPendingBlock + Call + 'static,
{
fn parse_and_flatten_bundle(
&self,
request: &SendBundleRequest,
) -> Result<Vec<FlattenedBundleItem<ProviderTx<Eth::Provider>>>, EthApiError> {
let mut items = Vec::new();
let mut stack = Vec::new();
stack.push((request, 0, 1));
while let Some((current_bundle, mut idx, depth)) = stack.pop() {
if depth > MAX_NESTED_BUNDLE_DEPTH {
return Err(EthApiError::InvalidParams(EthSimBundleError::MaxDepth.to_string()));
}
let inclusion = ¤t_bundle.inclusion;
let validity = ¤t_bundle.validity;
let privacy = ¤t_bundle.privacy;
let block_number = inclusion.block_number();
let max_block_number = inclusion.max_block_number().unwrap_or(block_number);
if max_block_number < block_number || block_number == 0 {
return Err(EthApiError::InvalidParams(
EthSimBundleError::InvalidInclusion.to_string(),
));
}
if current_bundle.bundle_body.len() > MAX_BUNDLE_BODY_SIZE {
return Err(EthApiError::InvalidParams(
EthSimBundleError::BundleTooLarge.to_string(),
));
}
if let Some(validity) = ¤t_bundle.validity {
if let Some(refunds) = &validity.refund {
let mut total_percent = 0;
for refund in refunds {
if refund.body_idx as usize >= current_bundle.bundle_body.len() {
return Err(EthApiError::InvalidParams(
EthSimBundleError::InvalidValidity.to_string(),
));
}
if 100 - total_percent < refund.percent {
return Err(EthApiError::InvalidParams(
EthSimBundleError::InvalidValidity.to_string(),
));
}
total_percent += refund.percent;
}
}
if let Some(refund_configs) = &validity.refund_config {
let mut total_percent = 0;
for refund_config in refund_configs {
if 100 - total_percent < refund_config.percent {
return Err(EthApiError::InvalidParams(
EthSimBundleError::InvalidValidity.to_string(),
));
}
total_percent += refund_config.percent;
}
}
}
let body = ¤t_bundle.bundle_body;
while idx < body.len() {
match &body[idx] {
BundleItem::Tx { tx, can_revert } => {
let recovered_tx = recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(tx)
.map_err(EthApiError::from)?;
let (tx, signer) = recovered_tx.to_components();
let tx: PoolConsensusTx<Eth::Pool> =
<Eth::Pool as TransactionPool>::Transaction::pooled_into_consensus(tx);
let refund_percent =
validity.as_ref().and_then(|v| v.refund.as_ref()).and_then(|refunds| {
refunds.iter().find_map(|refund| {
(refund.body_idx as usize == idx).then_some(refund.percent)
})
});
let refund_configs =
validity.as_ref().and_then(|v| v.refund_config.clone());
let flattened_item = FlattenedBundleItem {
tx,
signer,
can_revert: *can_revert,
inclusion: inclusion.clone(),
validity: validity.clone(),
privacy: privacy.clone(),
refund_percent,
refund_configs,
};
items.push(flattened_item);
idx += 1;
}
BundleItem::Bundle { bundle } => {
stack.push((current_bundle, idx + 1, depth));
stack.push((bundle, 0, depth + 1));
break;
}
BundleItem::Hash { hash: _ } => {
return Err(EthApiError::InvalidParams(
EthSimBundleError::InvalidBundle.to_string(),
));
}
}
}
}
Ok(items)
}
async fn sim_bundle(
&self,
request: SendBundleRequest,
overrides: SimBundleOverrides,
logs: bool,
) -> Result<SimBundleResponse, Eth::Error> {
let SimBundleOverrides {
parent_block,
block_number,
coinbase,
timestamp,
gas_limit,
base_fee,
..
} = overrides;
let flattened_bundle = self.parse_and_flatten_bundle(&request)?;
let block_id = parent_block.unwrap_or(BlockId::Number(BlockNumberOrTag::Pending));
let (evm_env, current_block) = self.eth_api().evm_env_at(block_id).await?;
let EvmEnv { cfg_env_with_handler_cfg, mut block_env } = evm_env;
let parent_header = RpcNodeCore::provider(&self.inner.eth_api)
.header_by_number(block_env.number.saturating_to::<u64>())
.map_err(EthApiError::from_eth_err)? .ok_or_else(|| {
EthApiError::HeaderNotFound((block_env.number.saturating_to::<u64>()).into())
})?;
if let Some(block_number) = block_number {
block_env.number = U256::from(block_number);
}
if let Some(coinbase) = coinbase {
block_env.coinbase = coinbase;
}
if let Some(timestamp) = timestamp {
block_env.timestamp = U256::from(timestamp);
}
if let Some(gas_limit) = gas_limit {
block_env.gas_limit = U256::from(gas_limit);
}
if let Some(base_fee) = base_fee {
block_env.basefee = U256::from(base_fee);
} else if cfg_env_with_handler_cfg.handler_cfg.spec_id.is_enabled_in(SpecId::LONDON) {
if let Some(base_fee) = parent_header.next_block_base_fee(
RpcNodeCore::provider(&self.inner.eth_api)
.chain_spec()
.base_fee_params_at_block(block_env.number.saturating_to::<u64>()),
) {
block_env.basefee = U256::from(base_fee);
}
}
let eth_api = self.inner.eth_api.clone();
let sim_response = self
.inner
.eth_api
.spawn_with_state_at_block(current_block, move |state| {
let current_block_number = current_block.as_u64().unwrap();
let coinbase = block_env.coinbase;
let basefee = block_env.basefee;
let env = EnvWithHandlerCfg::new_with_cfg_env(
cfg_env_with_handler_cfg,
block_env,
TxEnv::default(),
);
let db = CacheDB::new(StateProviderDatabase::new(state));
let initial_coinbase_balance = DatabaseRef::basic_ref(&db, coinbase)
.map_err(EthApiError::from_eth_err)?
.map(|acc| acc.balance)
.unwrap_or_default();
let mut coinbase_balance_before_tx = initial_coinbase_balance;
let mut total_gas_used = 0;
let mut total_profit = U256::ZERO;
let mut refundable_value = U256::ZERO;
let mut body_logs: Vec<SimBundleLogs> = Vec::new();
let mut evm = eth_api.evm_config().evm_with_env(db, env);
for item in &flattened_bundle {
let block_number = item.inclusion.block_number();
let max_block_number =
item.inclusion.max_block_number().unwrap_or(block_number);
if current_block_number < block_number ||
current_block_number > max_block_number
{
return Err(EthApiError::InvalidParams(
EthSimBundleError::InvalidInclusion.to_string(),
)
.into());
}
eth_api.evm_config().fill_tx_env(evm.tx_mut(), &item.tx, item.signer);
let ResultAndState { result, state } =
evm.transact().map_err(EthApiError::from_eth_err)?;
if !result.is_success() && !item.can_revert {
return Err(EthApiError::InvalidParams(
EthSimBundleError::BundleTransactionFailed.to_string(),
)
.into());
}
let gas_used = result.gas_used();
total_gas_used += gas_used;
let coinbase_balance_after_tx =
state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
let coinbase_diff =
coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
total_profit += coinbase_diff;
if item.refund_percent.is_none() {
refundable_value += coinbase_diff;
}
coinbase_balance_before_tx = coinbase_balance_after_tx;
if logs {
let tx_logs = result.logs().to_vec();
let sim_bundle_logs =
SimBundleLogs { tx_logs: Some(tx_logs), bundle_logs: None };
body_logs.push(sim_bundle_logs);
}
evm.context.evm.db.commit(state);
}
for item in &flattened_bundle {
if let Some(refund_percent) = item.refund_percent {
let refund_configs = item.refund_configs.clone().unwrap_or_else(|| {
vec![RefundConfig { address: item.signer, percent: 100 }]
});
let payout_tx_fee = basefee *
U256::from(SBUNDLE_PAYOUT_MAX_COST) *
U256::from(refund_configs.len() as u64);
total_gas_used += SBUNDLE_PAYOUT_MAX_COST * refund_configs.len() as u64;
let payout_value =
refundable_value * U256::from(refund_percent) / U256::from(100);
if payout_tx_fee > payout_value {
return Err(EthApiError::InvalidParams(
EthSimBundleError::NegativeProfit.to_string(),
)
.into());
}
total_profit = total_profit.checked_sub(payout_value).ok_or(
EthApiError::InvalidParams(
EthSimBundleError::NegativeProfit.to_string(),
),
)?;
refundable_value = refundable_value.checked_sub(payout_value).ok_or(
EthApiError::InvalidParams(
EthSimBundleError::NegativeProfit.to_string(),
),
)?;
}
}
let mev_gas_price = if total_gas_used != 0 {
total_profit / U256::from(total_gas_used)
} else {
U256::ZERO
};
Ok(SimBundleResponse {
success: true,
state_block: current_block_number,
error: None,
logs: Some(body_logs),
gas_used: total_gas_used,
mev_gas_price,
profit: total_profit,
refundable_value,
exec_error: None,
revert: None,
})
})
.await
.map_err(|_| {
EthApiError::InvalidParams(EthSimBundleError::BundleTimeout.to_string())
})?;
Ok(sim_response)
}
}
#[async_trait::async_trait]
impl<Eth> MevSimApiServer for EthSimBundle<Eth>
where
Eth: EthTransactions + LoadPendingBlock + Call + 'static,
{
async fn sim_bundle(
&self,
request: SendBundleRequest,
overrides: SimBundleOverrides,
) -> RpcResult<SimBundleResponse> {
info!("mev_simBundle called, request: {:?}, overrides: {:?}", request, overrides);
let override_timeout = overrides.timeout;
let timeout = override_timeout
.map(Duration::from_secs)
.filter(|&custom_duration| custom_duration <= MAX_SIM_TIMEOUT)
.unwrap_or(DEFAULT_SIM_TIMEOUT);
let bundle_res =
tokio::time::timeout(timeout, Self::sim_bundle(self, request, overrides, true))
.await
.map_err(|_| {
EthApiError::InvalidParams(EthSimBundleError::BundleTimeout.to_string())
})?;
bundle_res.map_err(Into::into)
}
}
#[derive(Debug)]
struct EthSimBundleInner<Eth> {
#[allow(dead_code)]
eth_api: Eth,
#[allow(dead_code)]
blocking_task_guard: BlockingTaskGuard,
}
impl<Eth> std::fmt::Debug for EthSimBundle<Eth> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthSimBundle").finish_non_exhaustive()
}
}
impl<Eth> Clone for EthSimBundle<Eth> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
#[derive(Debug, thiserror::Error)]
pub enum EthSimBundleError {
#[error("max depth reached")]
MaxDepth,
#[error("unmatched bundle")]
UnmatchedBundle,
#[error("bundle too large")]
BundleTooLarge,
#[error("invalid validity")]
InvalidValidity,
#[error("invalid inclusion")]
InvalidInclusion,
#[error("invalid bundle")]
InvalidBundle,
#[error("bundle simulation timed out")]
BundleTimeout,
#[error("bundle transaction failed")]
BundleTransactionFailed,
#[error("bundle simulation returned negative profit")]
NegativeProfit,
}