1use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7 pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8 FilteredParams, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12 server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_primitives_traits::NodePrimitives;
17use reth_rpc_eth_api::{
18 pubsub::EthPubSubApiServer, EthApiTypes, RpcNodeCore, RpcTransaction, TransactionCompat,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27 wrappers::{BroadcastStream, ReceiverStream},
28 Stream,
29};
30use tracing::error;
31
32#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37 inner: Arc<EthPubSubInner<Eth>>,
39 subscription_task_spawner: Box<dyn TaskSpawner>,
41}
42
43impl<Eth> EthPubSub<Eth> {
46 pub fn new(eth_api: Eth) -> Self {
50 Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
51 }
52
53 pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
55 let inner = EthPubSubInner { eth_api };
56 Self { inner: Arc::new(inner), subscription_task_spawner }
57 }
58}
59
60#[async_trait::async_trait]
61impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
62where
63 Eth: RpcNodeCore<
64 Provider: BlockNumReader + CanonStateSubscriptions,
65 Pool: TransactionPool,
66 Network: NetworkInfo,
67 > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
68 + 'static,
69{
70 async fn subscribe(
72 &self,
73 pending: PendingSubscriptionSink,
74 kind: SubscriptionKind,
75 params: Option<Params>,
76 ) -> jsonrpsee::core::SubscriptionResult {
77 let sink = pending.accept().await?;
78 let pubsub = self.inner.clone();
79 self.subscription_task_spawner.spawn(Box::pin(async move {
80 let _ = handle_accepted(pubsub, sink, kind, params).await;
81 }));
82
83 Ok(())
84 }
85}
86
87async fn handle_accepted<Eth>(
89 pubsub: Arc<EthPubSubInner<Eth>>,
90 accepted_sink: SubscriptionSink,
91 kind: SubscriptionKind,
92 params: Option<Params>,
93) -> Result<(), ErrorObject<'static>>
94where
95 Eth: RpcNodeCore<
96 Provider: BlockNumReader + CanonStateSubscriptions,
97 Pool: TransactionPool,
98 Network: NetworkInfo,
99 > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
100{
101 match kind {
102 SubscriptionKind::NewHeads => {
103 pipe_from_stream(accepted_sink, pubsub.new_headers_stream()).await
104 }
105 SubscriptionKind::Logs => {
106 let filter = match params {
108 Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
109 Some(Params::Bool(_)) => {
110 return Err(invalid_params_rpc_err("Invalid params for logs"))
111 }
112 _ => FilteredParams::default(),
113 };
114 pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
115 }
116 SubscriptionKind::NewPendingTransactions => {
117 if let Some(params) = params {
118 match params {
119 Params::Bool(true) => {
120 let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
122 let tx_value = match pubsub
123 .eth_api
124 .tx_resp_builder()
125 .fill_pending(tx.transaction.to_consensus())
126 {
127 Ok(tx) => Some(tx),
128 Err(err) => {
129 error!(target = "rpc",
130 %err,
131 "Failed to fill transaction with block context"
132 );
133 None
134 }
135 };
136 std::future::ready(tx_value)
137 });
138 return pipe_from_stream(accepted_sink, stream).await
139 }
140 Params::Bool(false) | Params::None => {
141 }
143 Params::Logs(_) => {
144 return Err(invalid_params_rpc_err(
145 "Invalid params for newPendingTransactions",
146 ))
147 }
148 }
149 }
150
151 pipe_from_stream(accepted_sink, pubsub.pending_transaction_hashes_stream()).await
152 }
153 SubscriptionKind::Syncing => {
154 let mut canon_state =
156 BroadcastStream::new(pubsub.eth_api.provider().subscribe_to_canonical_state());
157 let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
159 let current_sub_res = pubsub.sync_status(initial_sync_status);
160
161 let msg = SubscriptionMessage::from_json(¤t_sub_res)
163 .map_err(SubscriptionSerializeError::new)?;
164 if accepted_sink.send(msg).await.is_err() {
165 return Ok(())
166 }
167
168 while canon_state.next().await.is_some() {
169 let current_syncing = pubsub.eth_api.network().is_syncing();
170 if current_syncing != initial_sync_status {
172 initial_sync_status = current_syncing;
174
175 let sync_status = pubsub.sync_status(current_syncing);
177 let msg = SubscriptionMessage::from_json(&sync_status)
178 .map_err(SubscriptionSerializeError::new)?;
179 if accepted_sink.send(msg).await.is_err() {
180 break
181 }
182 }
183 }
184
185 Ok(())
186 }
187 }
188}
189
190#[derive(Debug, thiserror::Error)]
192#[error("Failed to serialize subscription item: {0}")]
193pub struct SubscriptionSerializeError(#[from] serde_json::Error);
194
195impl SubscriptionSerializeError {
196 const fn new(err: serde_json::Error) -> Self {
197 Self(err)
198 }
199}
200
201impl From<SubscriptionSerializeError> for ErrorObject<'static> {
202 fn from(value: SubscriptionSerializeError) -> Self {
203 internal_rpc_err(value.to_string())
204 }
205}
206
207async fn pipe_from_stream<T, St>(
209 sink: SubscriptionSink,
210 mut stream: St,
211) -> Result<(), ErrorObject<'static>>
212where
213 St: Stream<Item = T> + Unpin,
214 T: Serialize,
215{
216 loop {
217 tokio::select! {
218 _ = sink.closed() => {
219 break Ok(())
221 },
222 maybe_item = stream.next() => {
223 let item = match maybe_item {
224 Some(item) => item,
225 None => {
226 break Ok(())
228 },
229 };
230 let msg = SubscriptionMessage::from_json(&item).map_err(SubscriptionSerializeError::new)?;
231 if sink.send(msg).await.is_err() {
232 break Ok(());
233 }
234 }
235 }
236 }
237}
238
239impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("EthPubSub").finish_non_exhaustive()
242 }
243}
244
245#[derive(Clone)]
247struct EthPubSubInner<EthApi> {
248 eth_api: EthApi,
250}
251
252impl<Eth> EthPubSubInner<Eth>
255where
256 Eth: RpcNodeCore<Provider: BlockNumReader>,
257{
258 fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
260 if is_syncing {
261 let current_block = self
262 .eth_api
263 .provider()
264 .chain_info()
265 .map(|info| info.best_number)
266 .unwrap_or_default();
267 PubSubSyncStatus::Detailed(SyncStatusMetadata {
268 syncing: true,
269 starting_block: 0,
270 current_block,
271 highest_block: Some(current_block),
272 })
273 } else {
274 PubSubSyncStatus::Simple(false)
275 }
276 }
277}
278
279impl<Eth> EthPubSubInner<Eth>
280where
281 Eth: RpcNodeCore<Pool: TransactionPool>,
282{
283 fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
285 ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
286 }
287
288 fn full_pending_transaction_stream(
290 &self,
291 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
292 self.eth_api.pool().new_pending_pool_transactions_listener()
293 }
294}
295
296impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
297where
298 Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
299{
300 fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
302 self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
303 let headers = new_chain.committed().headers().collect::<Vec<_>>();
304 futures::stream::iter(
305 headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
306 )
307 })
308 }
309
310 fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
312 BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
313 .map(move |canon_state| {
314 canon_state.expect("new block subscription never ends").block_receipts()
315 })
316 .flat_map(futures::stream::iter)
317 .flat_map(move |(block_receipts, removed)| {
318 let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
319 &filter,
320 block_receipts.block,
321 block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
322 removed,
323 );
324 futures::stream::iter(all_logs)
325 })
326 }
327}