use std::sync::Arc;
use derive_more::Deref;
use futures::Future;
use reth_node_api::{BuilderProvider, FullNodeComponents};
use reth_primitives::{BlockNumberOrTag, U256};
use reth_provider::{BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider};
use reth_rpc_eth_api::{
helpers::{transaction::UpdateRawTxForwarder, EthSigner, SpawnBlocking},
EthApiTypes, RawTransactionForwarder,
};
use reth_rpc_eth_types::{
EthApiBuilderCtx, EthApiError, EthStateCache, FeeHistoryCache, GasCap, GasPriceOracle,
PendingBlock,
};
use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskExecutor, TaskSpawner, TokioTaskExecutor,
};
use tokio::sync::{AcquireError, Mutex, OwnedSemaphorePermit};
#[derive(Deref)]
pub struct EthApi<Provider, Pool, Network, EvmConfig> {
pub(super) inner: Arc<EthApiInner<Provider, Pool, Network, EvmConfig>>,
}
impl<Provider, Pool, Network, EvmConfig> EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReaderIdExt,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Provider,
pool: Pool,
network: Network,
eth_cache: EthStateCache,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: impl Into<GasCap>,
eth_proof_window: u64,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
raw_transaction_forwarder: Option<Arc<dyn RawTransactionForwarder>>,
proof_permits: usize,
) -> Self {
let inner = EthApiInner::new(
provider,
pool,
network,
eth_cache,
gas_oracle,
gas_cap,
eth_proof_window,
blocking_task_pool,
fee_history_cache,
evm_config,
TokioTaskExecutor::default(),
raw_transaction_forwarder,
proof_permits,
);
Self { inner: Arc::new(inner) }
}
}
impl<Provider, Pool, EvmConfig, Network> EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: ChainSpecProvider + BlockReaderIdExt + Clone + 'static,
Pool: Clone,
EvmConfig: Clone,
Network: Clone,
{
pub fn with_spawner<Tasks, Events>(
ctx: &EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>,
) -> Self
where
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions,
{
let blocking_task_pool =
BlockingTaskPool::build().expect("failed to build blocking task pool");
let inner = EthApiInner::new(
ctx.provider.clone(),
ctx.pool.clone(),
ctx.network.clone(),
ctx.cache.clone(),
ctx.new_gas_price_oracle(),
ctx.config.rpc_gas_cap,
ctx.config.eth_proof_window,
blocking_task_pool,
ctx.new_fee_history_cache(),
ctx.evm_config.clone(),
ctx.executor.clone(),
None,
ctx.config.proof_permits,
);
Self { inner: Arc::new(inner) }
}
}
impl<Provider, Pool, Network, EvmConfig> EthApiTypes for EthApi<Provider, Pool, Network, EvmConfig>
where
Self: Send + Sync,
{
type Error = EthApiError;
}
impl<Provider, Pool, Network, EvmConfig> std::fmt::Debug
for EthApi<Provider, Pool, Network, EvmConfig>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthApi").finish_non_exhaustive()
}
}
impl<Provider, Pool, Network, EvmConfig> Clone for EthApi<Provider, Pool, Network, EvmConfig> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
impl<Provider, Pool, Network, EvmConfig> SpawnBlocking
for EthApi<Provider, Pool, Network, EvmConfig>
where
Self: EthApiTypes + Clone + Send + Sync + 'static,
{
#[inline]
fn io_task_spawner(&self) -> impl reth_tasks::TaskSpawner {
self.inner.task_spawner()
}
#[inline]
fn tracing_task_pool(&self) -> &reth_tasks::pool::BlockingTaskPool {
self.inner.blocking_task_pool()
}
fn acquire_owned(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_task_guard.clone().acquire_owned()
}
fn acquire_many_owned(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_task_guard.clone().acquire_many_owned(n)
}
}
impl<N, Network> BuilderProvider<N> for EthApi<N::Provider, N::Pool, Network, N::Evm>
where
N: FullNodeComponents,
Network: Send + Sync + Clone + 'static,
{
type Ctx<'a> =
&'a EthApiBuilderCtx<N::Provider, N::Pool, N::Evm, Network, TaskExecutor, N::Provider>;
fn builder() -> Box<dyn for<'a> Fn(Self::Ctx<'a>) -> Self + Send> {
Box::new(|ctx| Self::with_spawner(ctx))
}
}
#[allow(missing_debug_implementations)]
pub struct EthApiInner<Provider, Pool, Network, EvmConfig> {
pool: Pool,
provider: Provider,
network: Network,
signers: parking_lot::RwLock<Vec<Box<dyn EthSigner>>>,
eth_cache: EthStateCache,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: u64,
eth_proof_window: u64,
starting_block: U256,
task_spawner: Box<dyn TaskSpawner>,
pending_block: Mutex<Option<PendingBlock>>,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
raw_transaction_forwarder: parking_lot::RwLock<Option<Arc<dyn RawTransactionForwarder>>>,
blocking_task_guard: BlockingTaskGuard,
}
impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig>
where
Provider: BlockReaderIdExt,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Provider,
pool: Pool,
network: Network,
eth_cache: EthStateCache,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: impl Into<GasCap>,
eth_proof_window: u64,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
task_spawner: impl TaskSpawner + 'static,
raw_transaction_forwarder: Option<Arc<dyn RawTransactionForwarder>>,
proof_permits: usize,
) -> Self {
let signers = parking_lot::RwLock::new(Default::default());
let starting_block = U256::from(
provider
.header_by_number_or_tag(BlockNumberOrTag::Latest)
.ok()
.flatten()
.map(|header| header.number)
.unwrap_or_default(),
);
Self {
provider,
pool,
network,
signers,
eth_cache,
gas_oracle,
gas_cap: gas_cap.into().into(),
eth_proof_window,
starting_block,
task_spawner: Box::new(task_spawner),
pending_block: Default::default(),
blocking_task_pool,
fee_history_cache,
evm_config,
raw_transaction_forwarder: parking_lot::RwLock::new(raw_transaction_forwarder),
blocking_task_guard: BlockingTaskGuard::new(proof_permits),
}
}
}
impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig> {
#[inline]
pub const fn provider(&self) -> &Provider {
&self.provider
}
#[inline]
pub const fn cache(&self) -> &EthStateCache {
&self.eth_cache
}
#[inline]
pub const fn pending_block(&self) -> &Mutex<Option<PendingBlock>> {
&self.pending_block
}
#[inline]
pub const fn task_spawner(&self) -> &dyn TaskSpawner {
&*self.task_spawner
}
#[inline]
pub const fn blocking_task_pool(&self) -> &BlockingTaskPool {
&self.blocking_task_pool
}
#[inline]
pub const fn evm_config(&self) -> &EvmConfig {
&self.evm_config
}
#[inline]
pub const fn pool(&self) -> &Pool {
&self.pool
}
#[inline]
pub fn raw_tx_forwarder(&self) -> Option<Arc<dyn RawTransactionForwarder>> {
self.raw_transaction_forwarder.read().clone()
}
#[inline]
pub const fn gas_cap(&self) -> u64 {
self.gas_cap
}
#[inline]
pub const fn gas_oracle(&self) -> &GasPriceOracle<Provider> {
&self.gas_oracle
}
#[inline]
pub const fn fee_history_cache(&self) -> &FeeHistoryCache {
&self.fee_history_cache
}
#[inline]
pub const fn signers(&self) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner>>> {
&self.signers
}
#[inline]
pub const fn starting_block(&self) -> U256 {
self.starting_block
}
#[inline]
pub const fn network(&self) -> &Network {
&self.network
}
#[inline]
pub const fn eth_proof_window(&self) -> u64 {
self.eth_proof_window
}
}
impl<Provider, Pool, Network, EvmConfig> UpdateRawTxForwarder
for EthApiInner<Provider, Pool, Network, EvmConfig>
{
fn set_eth_raw_transaction_forwarder(&self, forwarder: Arc<dyn RawTransactionForwarder>) {
self.raw_transaction_forwarder.write().replace(forwarder);
}
}
#[cfg(test)]
mod tests {
use jsonrpsee_types::error::INVALID_PARAMS_CODE;
use reth_chainspec::BaseFeeParams;
use reth_evm_ethereum::EthEvmConfig;
use reth_network_api::noop::NoopNetwork;
use reth_primitives::{Block, BlockNumberOrTag, Header, TransactionSigned, B256, U64};
use reth_provider::{
test_utils::{MockEthProvider, NoopProvider},
BlockReader, BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderFactory,
};
use reth_rpc_eth_api::EthApiServer;
use reth_rpc_eth_types::{
EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasPriceOracle,
};
use reth_rpc_server_types::constants::{DEFAULT_ETH_PROOF_WINDOW, DEFAULT_PROOF_PERMITS};
use reth_rpc_types::FeeHistory;
use reth_tasks::pool::BlockingTaskPool;
use reth_testing_utils::{generators, generators::Rng};
use reth_transaction_pool::test_utils::{testing_pool, TestPool};
use crate::EthApi;
fn build_test_eth_api<
P: BlockReaderIdExt
+ BlockReader
+ ChainSpecProvider
+ EvmEnvProvider
+ StateProviderFactory
+ Unpin
+ Clone
+ 'static,
>(
provider: P,
) -> EthApi<P, TestPool, NoopNetwork, EthEvmConfig> {
let evm_config = EthEvmConfig::default();
let cache = EthStateCache::spawn(provider.clone(), Default::default(), evm_config);
let fee_history_cache =
FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default());
let gas_cap = provider.chain_spec().max_gas_limit;
EthApi::new(
provider.clone(),
testing_pool(),
NoopNetwork::default(),
cache.clone(),
GasPriceOracle::new(provider, Default::default(), cache),
gas_cap,
DEFAULT_ETH_PROOF_WINDOW,
BlockingTaskPool::build().expect("failed to build tracing pool"),
fee_history_cache,
evm_config,
None,
DEFAULT_PROOF_PERMITS,
)
}
fn prepare_eth_api(
newest_block: u64,
mut oldest_block: Option<B256>,
block_count: u64,
mock_provider: MockEthProvider,
) -> (EthApi<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>, Vec<u128>, Vec<f64>) {
let mut rng = generators::rng();
let mut gas_used_ratios = Vec::new();
let mut base_fees_per_gas = Vec::new();
let mut last_header = None;
let mut parent_hash = B256::default();
for i in (0..block_count).rev() {
let hash = rng.gen();
let gas_limit: u64 = rng.gen();
let gas_used: u64 = rng.gen();
let base_fee_per_gas: Option<u64> = rng.gen::<bool>().then(|| rng.gen::<u32>() as u64);
let header = Header {
number: newest_block - i,
gas_limit,
gas_used,
base_fee_per_gas,
parent_hash,
..Default::default()
};
last_header = Some(header.clone());
parent_hash = hash;
let mut transactions = vec![];
for _ in 0..100 {
let random_fee: u128 = rng.gen();
if let Some(base_fee_per_gas) = header.base_fee_per_gas {
let transaction = TransactionSigned {
transaction: reth_primitives::Transaction::Eip1559(
reth_primitives::TxEip1559 {
max_priority_fee_per_gas: random_fee,
max_fee_per_gas: random_fee + base_fee_per_gas as u128,
..Default::default()
},
),
..Default::default()
};
transactions.push(transaction);
} else {
let transaction = TransactionSigned {
transaction: reth_primitives::Transaction::Legacy(Default::default()),
..Default::default()
};
transactions.push(transaction);
}
}
mock_provider.add_block(
hash,
Block { header: header.clone(), body: transactions, ..Default::default() },
);
mock_provider.add_header(hash, header);
oldest_block.get_or_insert(hash);
gas_used_ratios.push(gas_used as f64 / gas_limit as f64);
base_fees_per_gas.push(base_fee_per_gas.map(|fee| fee as u128).unwrap_or_default());
}
let last_header = last_header.unwrap();
base_fees_per_gas.push(BaseFeeParams::ethereum().next_block_base_fee(
last_header.gas_used as u128,
last_header.gas_limit as u128,
last_header.base_fee_per_gas.unwrap_or_default() as u128,
));
let eth_api = build_test_eth_api(mock_provider);
(eth_api, base_fees_per_gas, gas_used_ratios)
}
#[tokio::test]
async fn test_fee_history_empty() {
let response = <EthApi<_, _, _, _> as EthApiServer>::fee_history(
&build_test_eth_api(NoopProvider::default()),
U64::from(1),
BlockNumberOrTag::Latest,
None,
)
.await;
assert!(response.is_err());
let error_object = response.unwrap_err();
assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
}
#[tokio::test]
async fn test_fee_history_invalid_block_range_before_genesis() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, _, _) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let response = <EthApi<_, _, _, _> as EthApiServer>::fee_history(
ð_api,
U64::from(newest_block + 1),
newest_block.into(),
Some(vec![10.0]),
)
.await;
assert!(response.is_err());
let error_object = response.unwrap_err();
assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
}
#[tokio::test]
async fn test_fee_history_invalid_block_range_in_future() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, _, _) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let response = <EthApi<_, _, _, _> as EthApiServer>::fee_history(
ð_api,
U64::from(1),
(newest_block + 1000).into(),
Some(vec![10.0]),
)
.await;
assert!(response.is_err());
let error_object = response.unwrap_err();
assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
}
#[tokio::test]
async fn test_fee_history_no_block_requested() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, _, _) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let response = <EthApi<_, _, _, _> as EthApiServer>::fee_history(
ð_api,
U64::from(0),
newest_block.into(),
None,
)
.await
.unwrap();
assert_eq!(
response,
FeeHistory::default(),
"none: requesting no block should yield a default response"
);
}
#[tokio::test]
async fn test_fee_history_single_block() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, base_fees_per_gas, gas_used_ratios) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let fee_history =
eth_api.fee_history(U64::from(1), newest_block.into(), None).await.unwrap();
assert_eq!(
fee_history.base_fee_per_gas,
&base_fees_per_gas[base_fees_per_gas.len() - 2..],
"one: base fee per gas is incorrect"
);
assert_eq!(
fee_history.base_fee_per_gas.len(),
2,
"one: should return base fee of the next block as well"
);
assert_eq!(
&fee_history.gas_used_ratio,
&gas_used_ratios[gas_used_ratios.len() - 1..],
"one: gas used ratio is incorrect"
);
assert_eq!(fee_history.oldest_block, newest_block, "one: oldest block is incorrect");
assert!(
fee_history.reward.is_none(),
"one: no percentiles were requested, so there should be no rewards result"
);
}
#[tokio::test]
async fn test_fee_history_all_blocks() {
let block_count = 10;
let newest_block = 1337;
let oldest_block = None;
let (eth_api, base_fees_per_gas, gas_used_ratios) =
prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
let fee_history =
eth_api.fee_history(U64::from(block_count), newest_block.into(), None).await.unwrap();
assert_eq!(
&fee_history.base_fee_per_gas, &base_fees_per_gas,
"all: base fee per gas is incorrect"
);
assert_eq!(
fee_history.base_fee_per_gas.len() as u64,
block_count + 1,
"all: should return base fee of the next block as well"
);
assert_eq!(
&fee_history.gas_used_ratio, &gas_used_ratios,
"all: gas used ratio is incorrect"
);
assert_eq!(
fee_history.oldest_block,
newest_block - block_count + 1,
"all: oldest block is incorrect"
);
assert!(
fee_history.reward.is_none(),
"all: no percentiles were requested, so there should be no rewards result"
);
}
}