1use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7 pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8 Filter, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12 server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_rpc_convert::RpcHeader;
17use reth_rpc_eth_api::{
18 pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27 wrappers::{BroadcastStream, ReceiverStream},
28 Stream,
29};
30use tracing::error;
31
32#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37 inner: Arc<EthPubSubInner<Eth>>,
39}
40
41impl<Eth> EthPubSub<Eth> {
44 pub fn new(eth_api: Eth) -> Self {
48 Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
49 }
50
51 pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
53 let inner = EthPubSubInner { eth_api, subscription_task_spawner };
54 Self { inner: Arc::new(inner) }
55 }
56}
57
58impl<Eth> EthPubSub<Eth>
59where
60 Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
61{
62 pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
64 self.inner.sync_status(is_syncing)
65 }
66
67 pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
69 self.inner.pending_transaction_hashes_stream()
70 }
71
72 pub fn full_pending_transaction_stream(
74 &self,
75 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
76 self.inner.full_pending_transaction_stream()
77 }
78
79 pub fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
81 self.inner.new_headers_stream()
82 }
83
84 pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
86 self.inner.log_stream(filter)
87 }
88
89 pub async fn handle_accepted(
91 &self,
92 accepted_sink: SubscriptionSink,
93 kind: SubscriptionKind,
94 params: Option<Params>,
95 ) -> Result<(), ErrorObject<'static>> {
96 #[allow(unreachable_patterns)]
97 match kind {
98 SubscriptionKind::NewHeads => {
99 pipe_from_stream(accepted_sink, self.new_headers_stream()).await
100 }
101 SubscriptionKind::Logs => {
102 let filter = match params {
104 Some(Params::Logs(filter)) => *filter,
105 Some(Params::Bool(_)) => {
106 return Err(invalid_params_rpc_err("Invalid params for logs"))
107 }
108 _ => Default::default(),
109 };
110 pipe_from_stream(accepted_sink, self.log_stream(filter)).await
111 }
112 SubscriptionKind::NewPendingTransactions => {
113 if let Some(params) = params {
114 match params {
115 Params::Bool(true) => {
116 let stream = self.full_pending_transaction_stream().filter_map(|tx| {
118 let tx_value = match self
119 .inner
120 .eth_api
121 .converter()
122 .fill_pending(tx.transaction.to_consensus())
123 {
124 Ok(tx) => Some(tx),
125 Err(err) => {
126 error!(target = "rpc",
127 %err,
128 "Failed to fill transaction with block context"
129 );
130 None
131 }
132 };
133 std::future::ready(tx_value)
134 });
135 return pipe_from_stream(accepted_sink, stream).await
136 }
137 Params::Bool(false) | Params::None => {
138 }
140 Params::Logs(_) => {
141 return Err(invalid_params_rpc_err(
142 "Invalid params for newPendingTransactions",
143 ))
144 }
145 }
146 }
147
148 pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
149 }
150 SubscriptionKind::Syncing => {
151 let mut canon_state = BroadcastStream::new(
153 self.inner.eth_api.provider().subscribe_to_canonical_state(),
154 );
155 let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
157 let current_sub_res = self.sync_status(initial_sync_status);
158
159 let msg = SubscriptionMessage::new(
161 accepted_sink.method_name(),
162 accepted_sink.subscription_id(),
163 ¤t_sub_res,
164 )
165 .map_err(SubscriptionSerializeError::new)?;
166
167 if accepted_sink.send(msg).await.is_err() {
168 return Ok(())
169 }
170
171 while canon_state.next().await.is_some() {
172 let current_syncing = self.inner.eth_api.network().is_syncing();
173 if current_syncing != initial_sync_status {
175 initial_sync_status = current_syncing;
177
178 let sync_status = self.sync_status(current_syncing);
180 let msg = SubscriptionMessage::new(
181 accepted_sink.method_name(),
182 accepted_sink.subscription_id(),
183 &sync_status,
184 )
185 .map_err(SubscriptionSerializeError::new)?;
186
187 if accepted_sink.send(msg).await.is_err() {
188 break
189 }
190 }
191 }
192
193 Ok(())
194 }
195 _ => {
196 Err(invalid_params_rpc_err("Unsupported subscription kind"))
198 }
199 }
200 }
201}
202
203#[async_trait::async_trait]
204impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
205where
206 Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
207{
208 async fn subscribe(
210 &self,
211 pending: PendingSubscriptionSink,
212 kind: SubscriptionKind,
213 params: Option<Params>,
214 ) -> jsonrpsee::core::SubscriptionResult {
215 let sink = pending.accept().await?;
216 let pubsub = self.clone();
217 self.inner.subscription_task_spawner.spawn(Box::pin(async move {
218 let _ = pubsub.handle_accepted(sink, kind, params).await;
219 }));
220
221 Ok(())
222 }
223}
224
225#[derive(Debug, thiserror::Error)]
227#[error("Failed to serialize subscription item: {0}")]
228pub struct SubscriptionSerializeError(#[from] serde_json::Error);
229
230impl SubscriptionSerializeError {
231 const fn new(err: serde_json::Error) -> Self {
232 Self(err)
233 }
234}
235
236impl From<SubscriptionSerializeError> for ErrorObject<'static> {
237 fn from(value: SubscriptionSerializeError) -> Self {
238 internal_rpc_err(value.to_string())
239 }
240}
241
242async fn pipe_from_stream<T, St>(
244 sink: SubscriptionSink,
245 mut stream: St,
246) -> Result<(), ErrorObject<'static>>
247where
248 St: Stream<Item = T> + Unpin,
249 T: Serialize,
250{
251 loop {
252 tokio::select! {
253 _ = sink.closed() => {
254 break Ok(())
256 },
257 maybe_item = stream.next() => {
258 let item = match maybe_item {
259 Some(item) => item,
260 None => {
261 break Ok(())
263 },
264 };
265 let msg = SubscriptionMessage::new(
266 sink.method_name(),
267 sink.subscription_id(),
268 &item
269 ).map_err(SubscriptionSerializeError::new)?;
270
271 if sink.send(msg).await.is_err() {
272 break Ok(());
273 }
274 }
275 }
276 }
277}
278
279impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281 f.debug_struct("EthPubSub").finish_non_exhaustive()
282 }
283}
284
285#[derive(Clone)]
287struct EthPubSubInner<EthApi> {
288 eth_api: EthApi,
290 subscription_task_spawner: Box<dyn TaskSpawner>,
292}
293
294impl<Eth> EthPubSubInner<Eth>
297where
298 Eth: RpcNodeCore<Provider: BlockNumReader>,
299{
300 fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
302 if is_syncing {
303 let current_block = self
304 .eth_api
305 .provider()
306 .chain_info()
307 .map(|info| info.best_number)
308 .unwrap_or_default();
309 PubSubSyncStatus::Detailed(SyncStatusMetadata {
310 syncing: true,
311 starting_block: 0,
312 current_block,
313 highest_block: Some(current_block),
314 })
315 } else {
316 PubSubSyncStatus::Simple(false)
317 }
318 }
319}
320
321impl<Eth> EthPubSubInner<Eth>
322where
323 Eth: RpcNodeCore<Pool: TransactionPool>,
324{
325 fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
327 ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
328 }
329
330 fn full_pending_transaction_stream(
332 &self,
333 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
334 self.eth_api.pool().new_pending_pool_transactions_listener()
335 }
336}
337
338impl<Eth> EthPubSubInner<Eth>
339where
340 Eth: EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>> + RpcNodeCore,
341{
342 fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
344 let converter = self.eth_api.converter();
345 self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
346 let headers = new_chain
347 .committed()
348 .blocks_iter()
349 .filter_map(|block| {
350 match converter.convert_header(block.clone_sealed_header(), block.rlp_length())
351 {
352 Ok(header) => Some(header),
353 Err(err) => {
354 error!(target = "rpc", %err, "Failed to convert header");
355 None
356 }
357 }
358 })
359 .collect::<Vec<_>>();
360 futures::stream::iter(headers)
361 })
362 }
363
364 fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
366 BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
367 .map(move |canon_state| {
368 canon_state.expect("new block subscription never ends").block_receipts()
369 })
370 .flat_map(futures::stream::iter)
371 .flat_map(move |(block_receipts, removed)| {
372 let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
373 &filter,
374 block_receipts.block,
375 block_receipts.timestamp,
376 block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
377 removed,
378 );
379 futures::stream::iter(all_logs)
380 })
381 }
382}