use std::sync::Arc;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumberOrTag;
use alloy_network::Ethereum;
use alloy_primitives::{Bytes, U256};
use derive_more::Deref;
use reth_primitives::NodePrimitives;
use reth_provider::{
BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ProviderBlock,
ProviderReceipt,
};
use reth_rpc_eth_api::{
helpers::{EthSigner, SpawnBlocking},
node::RpcNodeCoreExt,
EthApiTypes, RpcNodeCore,
};
use reth_rpc_eth_types::{
EthApiBuilderCtx, EthApiError, EthStateCache, FeeHistoryCache, GasCap, GasPriceOracle,
PendingBlock,
};
use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner, TokioTaskExecutor,
};
use tokio::sync::{broadcast, Mutex};
use crate::eth::EthTxBuilder;
const DEFAULT_BROADCAST_CAPACITY: usize = 2000;
#[derive(Deref)]
pub struct EthApi<Provider: BlockReader, Pool, Network, EvmConfig> {
#[deref]
pub(super) inner: Arc<EthApiInner<Provider, Pool, Network, EvmConfig>>,
pub tx_resp_builder: EthTxBuilder,
}
impl<Provider, Pool, Network, EvmConfig> Clone for EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReader,
{
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), tx_resp_builder: EthTxBuilder }
}
}
impl<Provider, Pool, Network, EvmConfig> EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReaderIdExt,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Provider,
pool: Pool,
network: Network,
eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: impl Into<GasCap>,
max_simulate_blocks: u64,
eth_proof_window: u64,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
proof_permits: usize,
) -> Self {
let inner = EthApiInner::new(
provider,
pool,
network,
eth_cache,
gas_oracle,
gas_cap,
max_simulate_blocks,
eth_proof_window,
blocking_task_pool,
fee_history_cache,
evm_config,
TokioTaskExecutor::default(),
proof_permits,
);
Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder }
}
}
impl<Provider, Pool, EvmConfig, Network> EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: ChainSpecProvider + BlockReaderIdExt + Clone + 'static,
Pool: Clone,
EvmConfig: Clone,
Network: Clone,
{
pub fn with_spawner<Tasks, Events>(
ctx: &EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>,
) -> Self
where
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
Block = ProviderBlock<Provider>,
Receipt = ProviderReceipt<Provider>,
>,
>,
{
let blocking_task_pool =
BlockingTaskPool::build().expect("failed to build blocking task pool");
let inner = EthApiInner::new(
ctx.provider.clone(),
ctx.pool.clone(),
ctx.network.clone(),
ctx.cache.clone(),
ctx.new_gas_price_oracle(),
ctx.config.rpc_gas_cap,
ctx.config.rpc_max_simulate_blocks,
ctx.config.eth_proof_window,
blocking_task_pool,
ctx.new_fee_history_cache(),
ctx.evm_config.clone(),
ctx.executor.clone(),
ctx.config.proof_permits,
);
Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder }
}
}
impl<Provider, Pool, Network, EvmConfig> EthApiTypes for EthApi<Provider, Pool, Network, EvmConfig>
where
Self: Send + Sync,
Provider: BlockReader,
{
type Error = EthApiError;
type NetworkTypes = Ethereum;
type TransactionCompat = EthTxBuilder;
fn tx_resp_builder(&self) -> &Self::TransactionCompat {
&self.tx_resp_builder
}
}
impl<Provider, Pool, Network, EvmConfig> RpcNodeCore for EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReader + Clone + Unpin,
Pool: Send + Sync + Clone + Unpin,
Network: Send + Sync + Clone,
EvmConfig: Send + Sync + Clone + Unpin,
{
type Provider = Provider;
type Pool = Pool;
type Evm = EvmConfig;
type Network = Network;
type PayloadBuilder = ();
fn pool(&self) -> &Self::Pool {
self.inner.pool()
}
fn evm_config(&self) -> &Self::Evm {
self.inner.evm_config()
}
fn network(&self) -> &Self::Network {
self.inner.network()
}
fn payload_builder(&self) -> &Self::PayloadBuilder {
&()
}
fn provider(&self) -> &Self::Provider {
self.inner.provider()
}
}
impl<Provider, Pool, Network, EvmConfig> RpcNodeCoreExt
for EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReader + Clone + Unpin,
Pool: Send + Sync + Clone + Unpin,
Network: Send + Sync + Clone,
EvmConfig: Send + Sync + Clone + Unpin,
{
#[inline]
fn cache(&self) -> &EthStateCache<ProviderBlock<Provider>, ProviderReceipt<Provider>> {
self.inner.cache()
}
}
impl<Provider, Pool, Network, EvmConfig> std::fmt::Debug
for EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReader,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthApi").finish_non_exhaustive()
}
}
impl<Provider, Pool, Network, EvmConfig> SpawnBlocking
for EthApi<Provider, Pool, Network, EvmConfig>
where
Self: Clone + Send + Sync + 'static,
Provider: BlockReader,
{
#[inline]
fn io_task_spawner(&self) -> impl TaskSpawner {
self.inner.task_spawner()
}
#[inline]
fn tracing_task_pool(&self) -> &BlockingTaskPool {
self.inner.blocking_task_pool()
}
#[inline]
fn tracing_task_guard(&self) -> &BlockingTaskGuard {
self.inner.blocking_task_guard()
}
}
#[allow(missing_debug_implementations)]
pub struct EthApiInner<Provider: BlockReader, Pool, Network, EvmConfig> {
pool: Pool,
provider: Provider,
network: Network,
signers: parking_lot::RwLock<Vec<Box<dyn EthSigner<Provider::Transaction>>>>,
eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: u64,
max_simulate_blocks: u64,
eth_proof_window: u64,
starting_block: U256,
task_spawner: Box<dyn TaskSpawner>,
pending_block: Mutex<Option<PendingBlock<Provider::Block, Provider::Receipt>>>,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
blocking_task_guard: BlockingTaskGuard,
raw_tx_sender: broadcast::Sender<Bytes>,
}
impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReaderIdExt,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Provider,
pool: Pool,
network: Network,
eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: impl Into<GasCap>,
max_simulate_blocks: u64,
eth_proof_window: u64,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
task_spawner: impl TaskSpawner + 'static,
proof_permits: usize,
) -> Self {
let signers = parking_lot::RwLock::new(Default::default());
let starting_block = U256::from(
provider
.header_by_number_or_tag(BlockNumberOrTag::Latest)
.ok()
.flatten()
.map(|header| header.number())
.unwrap_or_default(),
);
let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY);
Self {
provider,
pool,
network,
signers,
eth_cache,
gas_oracle,
gas_cap: gas_cap.into().into(),
max_simulate_blocks,
eth_proof_window,
starting_block,
task_spawner: Box::new(task_spawner),
pending_block: Default::default(),
blocking_task_pool,
fee_history_cache,
evm_config,
blocking_task_guard: BlockingTaskGuard::new(proof_permits),
raw_tx_sender,
}
}
}
impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReader,
{
#[inline]
pub const fn provider(&self) -> &Provider {
&self.provider
}
#[inline]
pub const fn cache(&self) -> &EthStateCache<Provider::Block, Provider::Receipt> {
&self.eth_cache
}
#[inline]
pub const fn pending_block(
&self,
) -> &Mutex<Option<PendingBlock<Provider::Block, Provider::Receipt>>> {
&self.pending_block
}
#[inline]
pub const fn task_spawner(&self) -> &dyn TaskSpawner {
&*self.task_spawner
}
#[inline]
pub const fn blocking_task_pool(&self) -> &BlockingTaskPool {
&self.blocking_task_pool
}
#[inline]
pub const fn evm_config(&self) -> &EvmConfig {
&self.evm_config
}
#[inline]
pub const fn pool(&self) -> &Pool {
&self.pool
}
#[inline]
pub const fn gas_cap(&self) -> u64 {
self.gas_cap
}
#[inline]
pub const fn max_simulate_blocks(&self) -> u64 {
self.max_simulate_blocks
}
#[inline]
pub const fn gas_oracle(&self) -> &GasPriceOracle<Provider> {
&self.gas_oracle
}
#[inline]
pub const fn fee_history_cache(&self) -> &FeeHistoryCache {
&self.fee_history_cache
}
#[inline]
pub const fn signers(
&self,
) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<Provider::Transaction>>>> {
&self.signers
}
#[inline]
pub const fn starting_block(&self) -> U256 {
self.starting_block
}
#[inline]
pub const fn network(&self) -> &Network {
&self.network
}
#[inline]
pub const fn eth_proof_window(&self) -> u64 {
self.eth_proof_window
}
#[inline]
pub const fn blocking_task_guard(&self) -> &BlockingTaskGuard {
&self.blocking_task_guard
}
#[inline]
pub fn subscribe_to_raw_transactions(&self) -> broadcast::Receiver<Bytes> {
self.raw_tx_sender.subscribe()
}
#[inline]
pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) {
let _ = self.raw_tx_sender.send(raw_tx);
}
}
#[cfg(test)]
mod tests {
use crate::EthApi;
use alloy_consensus::Header;
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{PrimitiveSignature as Signature, B256, U64};
use alloy_rpc_types::FeeHistory;
use jsonrpsee_types::error::INVALID_PARAMS_CODE;
use reth_chainspec::{BaseFeeParams, ChainSpec};
use reth_evm_ethereum::EthEvmConfig;
use reth_network_api::noop::NoopNetwork;
use reth_primitives::{Block, BlockBody, TransactionSigned};
use reth_provider::{
test_utils::{MockEthProvider, NoopProvider},
BlockReader, BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderFactory,
};
use reth_rpc_eth_api::EthApiServer;
use reth_rpc_eth_types::{
EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasCap, GasPriceOracle,
};
use reth_rpc_server_types::constants::{
DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_PROOF_PERMITS,
};
use reth_tasks::pool::BlockingTaskPool;
use reth_testing_utils::{generators, generators::Rng};
use reth_transaction_pool::test_utils::{testing_pool, TestPool};
fn build_test_eth_api<
P: BlockReaderIdExt<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
> + BlockReader
+ ChainSpecProvider<ChainSpec = ChainSpec>
+ EvmEnvProvider
+ StateProviderFactory
+ Unpin
+ Clone
+ 'static,
>(
provider: P,
) -> EthApi<P, TestPool, NoopNetwork, EthEvmConfig> {
let evm_config = EthEvmConfig::new(provider.chain_spec());
let cache = EthStateCache::spawn(provider.clone(), Default::default());
let fee_history_cache = FeeHistoryCache::new(FeeHistoryCacheConfig::default());
EthApi::new(
provider.clone(),
testing_pool(),
NoopNetwork::default(),
cache.clone(),
GasPriceOracle::new(provider, Default::default(), cache),
GasCap::default(),
DEFAULT_MAX_SIMULATE_BLOCKS,
DEFAULT_ETH_PROOF_WINDOW,
BlockingTaskPool::build().expect("failed to build tracing pool"),
fee_history_cache,
evm_config,
DEFAULT_PROOF_PERMITS,
)
}
fn prepare_eth_api(
newest_block: u64,
mut oldest_block: Option<B256>,
block_count: u64,
mock_provider: MockEthProvider,
) -> (EthApi<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>, Vec<u128>, Vec<f64>) {
let mut rng = generators::rng();
let mut gas_used_ratios = Vec::with_capacity(block_count as usize);
let mut base_fees_per_gas = Vec::with_capacity(block_count as usize);
let mut last_header = None;
let mut parent_hash = B256::default();
for i in (0..block_count).rev() {
let hash = rng.gen();
let gas_limit = rng.gen::<u32>() as u64;
let base_fee_per_gas: Option<u64> = rng.gen::<bool>().then(|| rng.gen::<u32>() as u64);
let gas_used = rng.gen::<u32>() as u64;
let header = Header {
number: newest_block - i,
gas_limit,
gas_used,
base_fee_per_gas: base_fee_per_gas.map(Into::into),
parent_hash,
..Default::default()
};
last_header = Some(header.clone());
parent_hash = hash;
const TOTAL_TRANSACTIONS: usize = 100;
let mut transactions = Vec::with_capacity(TOTAL_TRANSACTIONS);
for _ in 0..TOTAL_TRANSACTIONS {
let random_fee: u128 = rng.gen();
if let Some(base_fee_per_gas) = header.base_fee_per_gas {
let transaction = TransactionSigned::new_unhashed(
reth_primitives::Transaction::Eip1559(alloy_consensus::TxEip1559 {
max_priority_fee_per_gas: random_fee,
max_fee_per_gas: random_fee + base_fee_per_gas as u128,
..Default::default()
}),
Signature::test_signature(),
);
transactions.push(transaction);
} else {
let transaction = TransactionSigned::new_unhashed(
reth_primitives::Transaction::Legacy(Default::default()),
Signature::test_signature(),
);
transactions.push(transaction);
}
}
mock_provider.add_block(
hash,
Block {
header: header.clone(),
body: BlockBody { transactions, ..Default::default() },
},
);
mock_provider.add_header(hash, header);
oldest_block.get_or_insert(hash);
gas_used_ratios.push(gas_used as f64 / gas_limit as f64);
base_fees_per_gas.push(base_fee_per_gas.map(|fee| fee as u128).unwrap_or_default());
}
let last_header = last_header.unwrap();
base_fees_per_gas.push(BaseFeeParams::ethereum().next_block_base_fee(
last_header.gas_used,
last_header.gas_limit,
last_header.base_fee_per_gas.unwrap_or_default(),
) as u128);
let eth_api = build_test_eth_api(mock_provider);
(eth_api, base_fees_per_gas, gas_used_ratios)
}
#[tokio::test]
async fn test_fee_history_empty() {
let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
&build_test_eth_api(NoopProvider::default()),
U64::from(1),
BlockNumberOrTag::Latest,
None,
)
.await;
assert!(response.is_err());
let error_object = response.unwrap_err();
assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
}
#[tokio::test]
async fn test_fee_history_invalid_block_range_before_genesis() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, _, _) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
ð_api,
U64::from(newest_block + 1),
newest_block.into(),
Some(vec![10.0]),
)
.await;
assert!(response.is_err());
let error_object = response.unwrap_err();
assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
}
#[tokio::test]
async fn test_fee_history_invalid_block_range_in_future() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, _, _) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
ð_api,
U64::from(1),
(newest_block + 1000).into(),
Some(vec![10.0]),
)
.await;
assert!(response.is_err());
let error_object = response.unwrap_err();
assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
}
#[tokio::test]
async fn test_fee_history_no_block_requested() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, _, _) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
ð_api,
U64::from(0),
newest_block.into(),
None,
)
.await
.unwrap();
assert_eq!(
response,
FeeHistory::default(),
"none: requesting no block should yield a default response"
);
}
#[tokio::test]
async fn test_fee_history_single_block() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, base_fees_per_gas, gas_used_ratios) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let fee_history =
eth_api.fee_history(U64::from(1), newest_block.into(), None).await.unwrap();
assert_eq!(
fee_history.base_fee_per_gas,
&base_fees_per_gas[base_fees_per_gas.len() - 2..],
"one: base fee per gas is incorrect"
);
assert_eq!(
fee_history.base_fee_per_gas.len(),
2,
"one: should return base fee of the next block as well"
);
assert_eq!(
&fee_history.gas_used_ratio,
&gas_used_ratios[gas_used_ratios.len() - 1..],
"one: gas used ratio is incorrect"
);
assert_eq!(fee_history.oldest_block, newest_block, "one: oldest block is incorrect");
assert!(
fee_history.reward.is_none(),
"one: no percentiles were requested, so there should be no rewards result"
);
}
#[tokio::test]
async fn test_fee_history_all_blocks() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, base_fees_per_gas, gas_used_ratios) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let fee_history =
eth_api.fee_history(U64::from(block_count), newest_block.into(), None).await.unwrap();
assert_eq!(
&fee_history.base_fee_per_gas, &base_fees_per_gas,
"all: base fee per gas is incorrect"
);
assert_eq!(
fee_history.base_fee_per_gas.len() as u64,
block_count + 1,
"all: should return base fee of the next block as well"
);
assert_eq!(
&fee_history.gas_used_ratio, &gas_used_ratios,
"all: gas used ratio is incorrect"
);
assert_eq!(
fee_history.oldest_block,
newest_block - block_count + 1,
"all: oldest block is incorrect"
);
assert!(
fee_history.reward.is_none(),
"all: no percentiles were requested, so there should be no rewards result"
);
}
}