1use std::sync::Arc;
4
5use alloy_primitives::{TxHash, U256};
6use alloy_rpc_types_eth::{
7 pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8 Filter, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12 server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_primitives_traits::NodePrimitives;
17use reth_rpc_eth_api::{
18 pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27 wrappers::{BroadcastStream, ReceiverStream},
28 Stream,
29};
30use tracing::error;
31
32#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37 inner: Arc<EthPubSubInner<Eth>>,
39}
40
41impl<Eth> EthPubSub<Eth> {
44 pub fn new(eth_api: Eth) -> Self {
48 Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
49 }
50
51 pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
53 let inner = EthPubSubInner { eth_api, subscription_task_spawner };
54 Self { inner: Arc::new(inner) }
55 }
56}
57
58impl<N: NodePrimitives, Eth> EthPubSub<Eth>
59where
60 Eth: RpcNodeCore<
61 Provider: BlockNumReader + CanonStateSubscriptions<Primitives = N>,
62 Pool: TransactionPool,
63 Network: NetworkInfo,
64 > + EthApiTypes<
65 RpcConvert: RpcConvert<
66 Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Eth::Pool>>,
67 >,
68 >,
69{
70 pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
72 self.inner.sync_status(is_syncing)
73 }
74
75 pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
77 self.inner.pending_transaction_hashes_stream()
78 }
79
80 pub fn full_pending_transaction_stream(
82 &self,
83 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
84 self.inner.full_pending_transaction_stream()
85 }
86
87 pub fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
89 self.inner.new_headers_stream()
90 }
91
92 pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
94 self.inner.log_stream(filter)
95 }
96
97 pub async fn handle_accepted(
99 &self,
100 accepted_sink: SubscriptionSink,
101 kind: SubscriptionKind,
102 params: Option<Params>,
103 ) -> Result<(), ErrorObject<'static>> {
104 #[allow(unreachable_patterns)]
105 match kind {
106 SubscriptionKind::NewHeads => {
107 pipe_from_stream(accepted_sink, self.new_headers_stream()).await
108 }
109 SubscriptionKind::Logs => {
110 let filter = match params {
112 Some(Params::Logs(filter)) => *filter,
113 Some(Params::Bool(_)) => {
114 return Err(invalid_params_rpc_err("Invalid params for logs"))
115 }
116 _ => Default::default(),
117 };
118 pipe_from_stream(accepted_sink, self.log_stream(filter)).await
119 }
120 SubscriptionKind::NewPendingTransactions => {
121 if let Some(params) = params {
122 match params {
123 Params::Bool(true) => {
124 let stream = self.full_pending_transaction_stream().filter_map(|tx| {
126 let tx_value = match self
127 .inner
128 .eth_api
129 .tx_resp_builder()
130 .fill_pending(tx.transaction.to_consensus())
131 {
132 Ok(tx) => Some(tx),
133 Err(err) => {
134 error!(target = "rpc",
135 %err,
136 "Failed to fill transaction with block context"
137 );
138 None
139 }
140 };
141 std::future::ready(tx_value)
142 });
143 return pipe_from_stream(accepted_sink, stream).await
144 }
145 Params::Bool(false) | Params::None => {
146 }
148 Params::Logs(_) => {
149 return Err(invalid_params_rpc_err(
150 "Invalid params for newPendingTransactions",
151 ))
152 }
153 }
154 }
155
156 pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
157 }
158 SubscriptionKind::Syncing => {
159 let mut canon_state = BroadcastStream::new(
161 self.inner.eth_api.provider().subscribe_to_canonical_state(),
162 );
163 let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
165 let current_sub_res = self.sync_status(initial_sync_status);
166
167 let msg = SubscriptionMessage::new(
169 accepted_sink.method_name(),
170 accepted_sink.subscription_id(),
171 ¤t_sub_res,
172 )
173 .map_err(SubscriptionSerializeError::new)?;
174
175 if accepted_sink.send(msg).await.is_err() {
176 return Ok(())
177 }
178
179 while canon_state.next().await.is_some() {
180 let current_syncing = self.inner.eth_api.network().is_syncing();
181 if current_syncing != initial_sync_status {
183 initial_sync_status = current_syncing;
185
186 let sync_status = self.sync_status(current_syncing);
188 let msg = SubscriptionMessage::new(
189 accepted_sink.method_name(),
190 accepted_sink.subscription_id(),
191 &sync_status,
192 )
193 .map_err(SubscriptionSerializeError::new)?;
194
195 if accepted_sink.send(msg).await.is_err() {
196 break
197 }
198 }
199 }
200
201 Ok(())
202 }
203 _ => {
204 Err(invalid_params_rpc_err("Unsupported subscription kind"))
206 }
207 }
208 }
209}
210
211#[async_trait::async_trait]
212impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
213where
214 Eth: RpcNodeCore<
215 Provider: BlockNumReader + CanonStateSubscriptions,
216 Pool: TransactionPool,
217 Network: NetworkInfo,
218 > + EthApiTypes<
219 RpcConvert: RpcConvert<
220 Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Eth::Pool>>,
221 >,
222 > + 'static,
223{
224 async fn subscribe(
226 &self,
227 pending: PendingSubscriptionSink,
228 kind: SubscriptionKind,
229 params: Option<Params>,
230 ) -> jsonrpsee::core::SubscriptionResult {
231 let sink = pending.accept().await?;
232 let pubsub = self.clone();
233 self.inner.subscription_task_spawner.spawn(Box::pin(async move {
234 let _ = pubsub.handle_accepted(sink, kind, params).await;
235 }));
236
237 Ok(())
238 }
239}
240
241#[derive(Debug, thiserror::Error)]
243#[error("Failed to serialize subscription item: {0}")]
244pub struct SubscriptionSerializeError(#[from] serde_json::Error);
245
246impl SubscriptionSerializeError {
247 const fn new(err: serde_json::Error) -> Self {
248 Self(err)
249 }
250}
251
252impl From<SubscriptionSerializeError> for ErrorObject<'static> {
253 fn from(value: SubscriptionSerializeError) -> Self {
254 internal_rpc_err(value.to_string())
255 }
256}
257
258async fn pipe_from_stream<T, St>(
260 sink: SubscriptionSink,
261 mut stream: St,
262) -> Result<(), ErrorObject<'static>>
263where
264 St: Stream<Item = T> + Unpin,
265 T: Serialize,
266{
267 loop {
268 tokio::select! {
269 _ = sink.closed() => {
270 break Ok(())
272 },
273 maybe_item = stream.next() => {
274 let item = match maybe_item {
275 Some(item) => item,
276 None => {
277 break Ok(())
279 },
280 };
281 let msg = SubscriptionMessage::new(
282 sink.method_name(),
283 sink.subscription_id(),
284 &item
285 ).map_err(SubscriptionSerializeError::new)?;
286
287 if sink.send(msg).await.is_err() {
288 break Ok(());
289 }
290 }
291 }
292 }
293}
294
295impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.debug_struct("EthPubSub").finish_non_exhaustive()
298 }
299}
300
301#[derive(Clone)]
303struct EthPubSubInner<EthApi> {
304 eth_api: EthApi,
306 subscription_task_spawner: Box<dyn TaskSpawner>,
308}
309
310impl<Eth> EthPubSubInner<Eth>
313where
314 Eth: RpcNodeCore<Provider: BlockNumReader>,
315{
316 fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
318 if is_syncing {
319 let current_block = self
320 .eth_api
321 .provider()
322 .chain_info()
323 .map(|info| info.best_number)
324 .unwrap_or_default();
325 PubSubSyncStatus::Detailed(SyncStatusMetadata {
326 syncing: true,
327 starting_block: 0,
328 current_block,
329 highest_block: Some(current_block),
330 })
331 } else {
332 PubSubSyncStatus::Simple(false)
333 }
334 }
335}
336
337impl<Eth> EthPubSubInner<Eth>
338where
339 Eth: RpcNodeCore<Pool: TransactionPool>,
340{
341 fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
343 ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
344 }
345
346 fn full_pending_transaction_stream(
348 &self,
349 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
350 self.eth_api.pool().new_pending_pool_transactions_listener()
351 }
352}
353
354impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
355where
356 Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
357{
358 fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
360 self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
361 let headers = new_chain
362 .committed()
363 .blocks_iter()
364 .map(|block| {
365 Header::from_consensus(
366 block.clone_sealed_header().into(),
367 None,
368 Some(U256::from(block.rlp_length())),
369 )
370 })
371 .collect::<Vec<_>>();
372 futures::stream::iter(headers)
373 })
374 }
375
376 fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
378 BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
379 .map(move |canon_state| {
380 canon_state.expect("new block subscription never ends").block_receipts()
381 })
382 .flat_map(futures::stream::iter)
383 .flat_map(move |(block_receipts, removed)| {
384 let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
385 &filter,
386 block_receipts.block,
387 block_receipts.timestamp,
388 block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
389 removed,
390 );
391 futures::stream::iter(all_logs)
392 })
393 }
394}