use std::sync::Arc;
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{
pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
SyncStatusMetadata,
},
FilteredParams, Header, Log,
};
use futures::StreamExt;
use jsonrpsee::{
server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
};
use reth_network_api::NetworkInfo;
use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider};
use reth_rpc_eth_api::{pubsub::EthPubSubApiServer, TransactionCompat};
use reth_rpc_eth_types::logs_utils;
use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
use reth_rpc_types_compat::transaction::from_recovered;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
use serde::Serialize;
use tokio_stream::{
wrappers::{BroadcastStream, ReceiverStream},
Stream,
};
use tracing::error;
#[derive(Clone)]
pub struct EthPubSub<Provider, Pool, Events, Network, Eth> {
inner: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
subscription_task_spawner: Box<dyn TaskSpawner>,
tx_resp_builder: Eth,
}
impl<Provider, Pool, Events, Network, Eth> EthPubSub<Provider, Pool, Events, Network, Eth> {
pub fn new(
provider: Provider,
pool: Pool,
chain_events: Events,
network: Network,
tx_resp_builder: Eth,
) -> Self {
Self::with_spawner(
provider,
pool,
chain_events,
network,
Box::<TokioTaskExecutor>::default(),
tx_resp_builder,
)
}
pub fn with_spawner(
provider: Provider,
pool: Pool,
chain_events: Events,
network: Network,
subscription_task_spawner: Box<dyn TaskSpawner>,
tx_resp_builder: Eth,
) -> Self {
let inner = EthPubSubInner { provider, pool, chain_events, network };
Self { inner: Arc::new(inner), subscription_task_spawner, tx_resp_builder }
}
}
#[async_trait::async_trait]
impl<Provider, Pool, Events, Network, Eth> EthPubSubApiServer<Eth::Transaction>
for EthPubSub<Provider, Pool, Events, Network, Eth>
where
Provider: BlockReader + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
Eth: TransactionCompat + 'static,
{
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let pubsub = self.inner.clone();
let resp_builder = self.tx_resp_builder.clone();
self.subscription_task_spawner.spawn(Box::pin(async move {
let _ = handle_accepted(pubsub, sink, kind, params, resp_builder).await;
}));
Ok(())
}
}
async fn handle_accepted<Provider, Pool, Events, Network, Eth>(
pubsub: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
tx_resp_builder: Eth,
) -> Result<(), ErrorObject<'static>>
where
Provider: BlockReader + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
Eth: TransactionCompat,
{
match kind {
SubscriptionKind::NewHeads => {
let stream = pubsub
.new_headers_stream()
.map(|header| EthSubscriptionResult::<()>::Header(Box::new(header.into())));
pipe_from_stream(accepted_sink, stream).await
}
SubscriptionKind::Logs => {
let filter = match params {
Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
Some(Params::Bool(_)) => {
return Err(invalid_params_rpc_err("Invalid params for logs"))
}
_ => FilteredParams::default(),
};
let stream = pubsub
.log_stream(filter)
.map(|log| EthSubscriptionResult::<()>::Log(Box::new(log)));
pipe_from_stream(accepted_sink, stream).await
}
SubscriptionKind::NewPendingTransactions => {
if let Some(params) = params {
match params {
Params::Bool(true) => {
let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
let tx_value = match from_recovered(
tx.transaction.to_recovered_transaction(),
&tx_resp_builder,
) {
Ok(tx) => {
Some(EthSubscriptionResult::FullTransaction(Box::new(tx)))
}
Err(err) => {
error!(target = "rpc",
%err,
"Failed to fill transaction with block context"
);
None
}
};
std::future::ready(tx_value)
});
return pipe_from_stream(accepted_sink, stream).await
}
Params::Bool(false) | Params::None => {
}
Params::Logs(_) => {
return Err(invalid_params_rpc_err(
"Invalid params for newPendingTransactions",
))
}
}
}
let stream = pubsub
.pending_transaction_hashes_stream()
.map(EthSubscriptionResult::<()>::TransactionHash);
pipe_from_stream(accepted_sink, stream).await
}
SubscriptionKind::Syncing => {
let mut canon_state =
BroadcastStream::new(pubsub.chain_events.subscribe_to_canonical_state());
let mut initial_sync_status = pubsub.network.is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status);
let msg = SubscriptionMessage::from_json(¤t_sub_res)
.map_err(SubscriptionSerializeError::new)?;
if accepted_sink.send(msg).await.is_err() {
return Ok(())
}
while canon_state.next().await.is_some() {
let current_syncing = pubsub.network.is_syncing();
if current_syncing != initial_sync_status {
initial_sync_status = current_syncing;
let sync_status = pubsub.sync_status(current_syncing);
let msg = SubscriptionMessage::from_json(&sync_status)
.map_err(SubscriptionSerializeError::new)?;
if accepted_sink.send(msg).await.is_err() {
break
}
}
}
Ok(())
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("Failed to serialize subscription item: {0}")]
pub struct SubscriptionSerializeError(#[from] serde_json::Error);
impl SubscriptionSerializeError {
const fn new(err: serde_json::Error) -> Self {
Self(err)
}
}
impl From<SubscriptionSerializeError> for ErrorObject<'static> {
fn from(value: SubscriptionSerializeError) -> Self {
internal_rpc_err(value.to_string())
}
}
async fn pipe_from_stream<T, St>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>>
where
St: Stream<Item = T> + Unpin,
T: Serialize,
{
loop {
tokio::select! {
_ = sink.closed() => {
break Ok(())
},
maybe_item = stream.next() => {
let item = match maybe_item {
Some(item) => item,
None => {
break Ok(())
},
};
let msg = SubscriptionMessage::from_json(&item).map_err(SubscriptionSerializeError::new)?;
if sink.send(msg).await.is_err() {
break Ok(());
}
}
}
}
}
impl<Provider, Pool, Events, Network, Eth> std::fmt::Debug
for EthPubSub<Provider, Pool, Events, Network, Eth>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthPubSub").finish_non_exhaustive()
}
}
#[derive(Clone)]
struct EthPubSubInner<Provider, Pool, Events, Network> {
pool: Pool,
provider: Provider,
chain_events: Events,
network: Network,
}
impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Network>
where
Provider: BlockReader + 'static,
{
fn sync_status(&self, is_syncing: bool) -> EthSubscriptionResult {
if is_syncing {
let current_block =
self.provider.chain_info().map(|info| info.best_number).unwrap_or_default();
EthSubscriptionResult::SyncState(PubSubSyncStatus::Detailed(SyncStatusMetadata {
syncing: true,
starting_block: 0,
current_block,
highest_block: Some(current_block),
}))
} else {
EthSubscriptionResult::SyncState(PubSubSyncStatus::Simple(false))
}
}
}
impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Network>
where
Pool: TransactionPool + 'static,
{
fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.pool.pending_transactions_listener())
}
fn full_pending_transaction_stream(
&self,
) -> impl Stream<Item = NewTransactionEvent<<Pool as TransactionPool>::Transaction>> {
self.pool.new_pending_pool_transactions_listener()
}
}
impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Network>
where
Provider: BlockReader + EvmEnvProvider + 'static,
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
Pool: 'static,
{
fn new_headers_stream(&self) -> impl Stream<Item = Header> {
self.chain_events.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain.committed().headers().collect::<Vec<_>>();
futures::stream::iter(
headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
)
})
}
fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends").block_receipts()
})
.flat_map(futures::stream::iter)
.flat_map(move |(block_receipts, removed)| {
let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
&filter,
block_receipts.block,
block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
removed,
);
futures::stream::iter(all_logs)
})
}
}