1use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7 pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8 Filter, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12 server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_rpc_convert::RpcHeader;
17use reth_rpc_eth_api::{
18 pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::Runtime;
24use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27 wrappers::{BroadcastStream, ReceiverStream},
28 Stream,
29};
30use tracing::error;
31
32#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37 inner: Arc<EthPubSubInner<Eth>>,
39}
40
41impl<Eth> EthPubSub<Eth> {
44 pub fn new(eth_api: Eth, subscription_task_spawner: Runtime) -> Self {
46 let inner = EthPubSubInner { eth_api, subscription_task_spawner };
47 Self { inner: Arc::new(inner) }
48 }
49}
50
51impl<Eth> EthPubSub<Eth>
52where
53 Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
54{
55 pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
57 self.inner.sync_status(is_syncing)
58 }
59
60 pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
62 self.inner.pending_transaction_hashes_stream()
63 }
64
65 pub fn full_pending_transaction_stream(
67 &self,
68 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
69 self.inner.full_pending_transaction_stream()
70 }
71
72 pub fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
74 self.inner.new_headers_stream()
75 }
76
77 pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
79 self.inner.log_stream(filter)
80 }
81
82 pub async fn handle_accepted(
84 &self,
85 accepted_sink: SubscriptionSink,
86 kind: SubscriptionKind,
87 params: Option<Params>,
88 ) -> Result<(), ErrorObject<'static>> {
89 #[allow(unreachable_patterns)]
90 match kind {
91 SubscriptionKind::NewHeads => {
92 pipe_from_stream(accepted_sink, self.new_headers_stream()).await
93 }
94 SubscriptionKind::Logs => {
95 let filter = match params {
97 Some(Params::Logs(filter)) => *filter,
98 Some(Params::Bool(_)) => {
99 return Err(invalid_params_rpc_err("Invalid params for logs"))
100 }
101 _ => Default::default(),
102 };
103 pipe_from_stream(accepted_sink, self.log_stream(filter)).await
104 }
105 SubscriptionKind::NewPendingTransactions => {
106 if let Some(params) = params {
107 match params {
108 Params::Bool(true) => {
109 let stream = self.full_pending_transaction_stream().filter_map(|tx| {
111 let tx_value = match self
112 .inner
113 .eth_api
114 .converter()
115 .fill_pending(tx.transaction.to_consensus())
116 {
117 Ok(tx) => Some(tx),
118 Err(err) => {
119 error!(target = "rpc",
120 %err,
121 "Failed to fill transaction with block context"
122 );
123 None
124 }
125 };
126 std::future::ready(tx_value)
127 });
128 return pipe_from_stream(accepted_sink, stream).await
129 }
130 Params::Bool(false) | Params::None => {
131 }
133 _ => {
134 return Err(invalid_params_rpc_err(
135 "Invalid params for newPendingTransactions",
136 ))
137 }
138 }
139 }
140
141 pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
142 }
143 SubscriptionKind::Syncing => {
144 let mut canon_state = BroadcastStream::new(
146 self.inner.eth_api.provider().subscribe_to_canonical_state(),
147 );
148 let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
150 let current_sub_res = self.sync_status(initial_sync_status);
151
152 let msg = SubscriptionMessage::new(
154 accepted_sink.method_name(),
155 accepted_sink.subscription_id(),
156 ¤t_sub_res,
157 )
158 .map_err(SubscriptionSerializeError::new)?;
159
160 if accepted_sink.send(msg).await.is_err() {
161 return Ok(())
162 }
163
164 while canon_state.next().await.is_some() {
165 let current_syncing = self.inner.eth_api.network().is_syncing();
166 if current_syncing != initial_sync_status {
168 initial_sync_status = current_syncing;
170
171 let sync_status = self.sync_status(current_syncing);
173 let msg = SubscriptionMessage::new(
174 accepted_sink.method_name(),
175 accepted_sink.subscription_id(),
176 &sync_status,
177 )
178 .map_err(SubscriptionSerializeError::new)?;
179
180 if accepted_sink.send(msg).await.is_err() {
181 break
182 }
183 }
184 }
185
186 Ok(())
187 }
188 _ => {
189 Err(invalid_params_rpc_err("Unsupported subscription kind"))
191 }
192 }
193 }
194}
195
196#[async_trait::async_trait]
197impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
198where
199 Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
200{
201 async fn subscribe(
203 &self,
204 pending: PendingSubscriptionSink,
205 kind: SubscriptionKind,
206 params: Option<Params>,
207 ) -> jsonrpsee::core::SubscriptionResult {
208 let sink = pending.accept().await?;
209 let pubsub = self.clone();
210 self.inner.subscription_task_spawner.spawn_task(async move {
211 let _ = pubsub.handle_accepted(sink, kind, params).await;
212 });
213
214 Ok(())
215 }
216}
217
218#[derive(Debug, thiserror::Error)]
220#[error("Failed to serialize subscription item: {0}")]
221pub struct SubscriptionSerializeError(#[from] serde_json::Error);
222
223impl SubscriptionSerializeError {
224 const fn new(err: serde_json::Error) -> Self {
225 Self(err)
226 }
227}
228
229impl From<SubscriptionSerializeError> for ErrorObject<'static> {
230 fn from(value: SubscriptionSerializeError) -> Self {
231 internal_rpc_err(value.to_string())
232 }
233}
234
235async fn pipe_from_stream<T, St>(
237 sink: SubscriptionSink,
238 mut stream: St,
239) -> Result<(), ErrorObject<'static>>
240where
241 St: Stream<Item = T> + Unpin,
242 T: Serialize,
243{
244 loop {
245 tokio::select! {
246 _ = sink.closed() => {
247 break Ok(())
249 },
250 maybe_item = stream.next() => {
251 let item = match maybe_item {
252 Some(item) => item,
253 None => {
254 break Ok(())
256 },
257 };
258 let msg = SubscriptionMessage::new(
259 sink.method_name(),
260 sink.subscription_id(),
261 &item
262 ).map_err(SubscriptionSerializeError::new)?;
263
264 if sink.send(msg).await.is_err() {
265 break Ok(());
266 }
267 }
268 }
269 }
270}
271
272impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274 f.debug_struct("EthPubSub").finish_non_exhaustive()
275 }
276}
277
278#[derive(Clone)]
280struct EthPubSubInner<EthApi> {
281 eth_api: EthApi,
283 subscription_task_spawner: Runtime,
285}
286
287impl<Eth> EthPubSubInner<Eth>
290where
291 Eth: RpcNodeCore<Provider: BlockNumReader>,
292{
293 fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
295 if is_syncing {
296 let current_block = self
297 .eth_api
298 .provider()
299 .chain_info()
300 .map(|info| info.best_number)
301 .unwrap_or_default();
302 PubSubSyncStatus::Detailed(SyncStatusMetadata {
303 syncing: true,
304 starting_block: 0,
305 current_block,
306 highest_block: Some(current_block),
307 })
308 } else {
309 PubSubSyncStatus::Simple(false)
310 }
311 }
312}
313
314impl<Eth> EthPubSubInner<Eth>
315where
316 Eth: RpcNodeCore<Pool: TransactionPool>,
317{
318 fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
320 ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
321 }
322
323 fn full_pending_transaction_stream(
325 &self,
326 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
327 self.eth_api.pool().new_pending_pool_transactions_listener()
328 }
329}
330
331impl<Eth> EthPubSubInner<Eth>
332where
333 Eth: EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>> + RpcNodeCore,
334{
335 fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
337 let converter = self.eth_api.converter();
338 self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
339 let headers = new_chain
340 .committed()
341 .blocks_iter()
342 .filter_map(|block| {
343 match converter.convert_header(block.clone_sealed_header(), block.rlp_length())
344 {
345 Ok(header) => Some(header),
346 Err(err) => {
347 error!(target = "rpc", %err, "Failed to convert header");
348 None
349 }
350 }
351 })
352 .collect::<Vec<_>>();
353 futures::stream::iter(headers)
354 })
355 }
356
357 fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
359 BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
360 .map(move |canon_state| {
361 canon_state.expect("new block subscription never ends").block_receipts()
362 })
363 .flat_map(futures::stream::iter)
364 .flat_map(move |(block_receipts, removed)| {
365 let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
366 &filter,
367 block_receipts.block,
368 block_receipts.timestamp,
369 block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
370 removed,
371 );
372 futures::stream::iter(all_logs)
373 })
374 }
375}