Skip to main content

reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7    pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8    Filter, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_rpc_convert::RpcHeader;
17use reth_rpc_eth_api::{
18    pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::Runtime;
24use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27    wrappers::{BroadcastStream, ReceiverStream},
28    Stream,
29};
30use tracing::error;
31
32/// `Eth` pubsub RPC implementation.
33///
34/// This handles `eth_subscribe` RPC calls.
35#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37    /// All nested fields bundled together.
38    inner: Arc<EthPubSubInner<Eth>>,
39}
40
41// === impl EthPubSub ===
42
43impl<Eth> EthPubSub<Eth> {
44    /// Creates a new, shareable instance.
45    pub fn new(eth_api: Eth, subscription_task_spawner: Runtime) -> Self {
46        let inner = EthPubSubInner { eth_api, subscription_task_spawner };
47        Self { inner: Arc::new(inner) }
48    }
49}
50
51impl<Eth> EthPubSub<Eth>
52where
53    Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
54{
55    /// Returns the current sync status for the `syncing` subscription
56    pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
57        self.inner.sync_status(is_syncing)
58    }
59
60    /// Returns a stream that yields all transaction hashes emitted by the txpool.
61    pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
62        self.inner.pending_transaction_hashes_stream()
63    }
64
65    /// Returns a stream that yields all transactions emitted by the txpool.
66    pub fn full_pending_transaction_stream(
67        &self,
68    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
69        self.inner.full_pending_transaction_stream()
70    }
71
72    /// Returns a stream that yields all new RPC blocks.
73    pub fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
74        self.inner.new_headers_stream()
75    }
76
77    /// Returns a stream that yields all logs that match the given filter.
78    pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
79        self.inner.log_stream(filter)
80    }
81
82    /// The actual handler for an accepted [`EthPubSub::subscribe`] call.
83    pub async fn handle_accepted(
84        &self,
85        accepted_sink: SubscriptionSink,
86        kind: SubscriptionKind,
87        params: Option<Params>,
88    ) -> Result<(), ErrorObject<'static>> {
89        #[allow(unreachable_patterns)]
90        match kind {
91            SubscriptionKind::NewHeads => {
92                pipe_from_stream(accepted_sink, self.new_headers_stream()).await
93            }
94            SubscriptionKind::Logs => {
95                // if no params are provided, used default filter params
96                let filter = match params {
97                    Some(Params::Logs(filter)) => *filter,
98                    Some(Params::Bool(_)) => {
99                        return Err(invalid_params_rpc_err("Invalid params for logs"))
100                    }
101                    _ => Default::default(),
102                };
103                pipe_from_stream(accepted_sink, self.log_stream(filter)).await
104            }
105            SubscriptionKind::NewPendingTransactions => {
106                if let Some(params) = params {
107                    match params {
108                        Params::Bool(true) => {
109                            // full transaction objects requested
110                            let stream = self.full_pending_transaction_stream().filter_map(|tx| {
111                                let tx_value = match self
112                                    .inner
113                                    .eth_api
114                                    .converter()
115                                    .fill_pending(tx.transaction.to_consensus())
116                                {
117                                    Ok(tx) => Some(tx),
118                                    Err(err) => {
119                                        error!(target = "rpc",
120                                            %err,
121                                            "Failed to fill transaction with block context"
122                                        );
123                                        None
124                                    }
125                                };
126                                std::future::ready(tx_value)
127                            });
128                            return pipe_from_stream(accepted_sink, stream).await
129                        }
130                        Params::Bool(false) | Params::None => {
131                            // only hashes requested
132                        }
133                        _ => {
134                            return Err(invalid_params_rpc_err(
135                                "Invalid params for newPendingTransactions",
136                            ))
137                        }
138                    }
139                }
140
141                pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
142            }
143            SubscriptionKind::Syncing => {
144                // get new block subscription
145                let mut canon_state = BroadcastStream::new(
146                    self.inner.eth_api.provider().subscribe_to_canonical_state(),
147                );
148                // get current sync status
149                let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
150                let current_sub_res = self.sync_status(initial_sync_status);
151
152                // send the current status immediately
153                let msg = SubscriptionMessage::new(
154                    accepted_sink.method_name(),
155                    accepted_sink.subscription_id(),
156                    &current_sub_res,
157                )
158                .map_err(SubscriptionSerializeError::new)?;
159
160                if accepted_sink.send(msg).await.is_err() {
161                    return Ok(())
162                }
163
164                while canon_state.next().await.is_some() {
165                    let current_syncing = self.inner.eth_api.network().is_syncing();
166                    // Only send a new response if the sync status has changed
167                    if current_syncing != initial_sync_status {
168                        // Update the sync status on each new block
169                        initial_sync_status = current_syncing;
170
171                        // send a new message now that the status changed
172                        let sync_status = self.sync_status(current_syncing);
173                        let msg = SubscriptionMessage::new(
174                            accepted_sink.method_name(),
175                            accepted_sink.subscription_id(),
176                            &sync_status,
177                        )
178                        .map_err(SubscriptionSerializeError::new)?;
179
180                        if accepted_sink.send(msg).await.is_err() {
181                            break
182                        }
183                    }
184                }
185
186                Ok(())
187            }
188            _ => {
189                // TODO: implement once https://github.com/alloy-rs/alloy/pull/3410 is released
190                Err(invalid_params_rpc_err("Unsupported subscription kind"))
191            }
192        }
193    }
194}
195
196#[async_trait::async_trait]
197impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
198where
199    Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
200{
201    /// Handler for `eth_subscribe`
202    async fn subscribe(
203        &self,
204        pending: PendingSubscriptionSink,
205        kind: SubscriptionKind,
206        params: Option<Params>,
207    ) -> jsonrpsee::core::SubscriptionResult {
208        let sink = pending.accept().await?;
209        let pubsub = self.clone();
210        self.inner.subscription_task_spawner.spawn_task(async move {
211            let _ = pubsub.handle_accepted(sink, kind, params).await;
212        });
213
214        Ok(())
215    }
216}
217
218/// Helper to convert a serde error into an [`ErrorObject`]
219#[derive(Debug, thiserror::Error)]
220#[error("Failed to serialize subscription item: {0}")]
221pub struct SubscriptionSerializeError(#[from] serde_json::Error);
222
223impl SubscriptionSerializeError {
224    const fn new(err: serde_json::Error) -> Self {
225        Self(err)
226    }
227}
228
229impl From<SubscriptionSerializeError> for ErrorObject<'static> {
230    fn from(value: SubscriptionSerializeError) -> Self {
231        internal_rpc_err(value.to_string())
232    }
233}
234
235/// Pipes all stream items to the subscription sink.
236async fn pipe_from_stream<T, St>(
237    sink: SubscriptionSink,
238    mut stream: St,
239) -> Result<(), ErrorObject<'static>>
240where
241    St: Stream<Item = T> + Unpin,
242    T: Serialize,
243{
244    loop {
245        tokio::select! {
246            _ = sink.closed() => {
247                // connection dropped
248                break Ok(())
249            },
250            maybe_item = stream.next() => {
251                let item = match maybe_item {
252                    Some(item) => item,
253                    None => {
254                        // stream ended
255                        break  Ok(())
256                    },
257                };
258                let msg = SubscriptionMessage::new(
259                    sink.method_name(),
260                    sink.subscription_id(),
261                    &item
262                ).map_err(SubscriptionSerializeError::new)?;
263
264                if sink.send(msg).await.is_err() {
265                    break Ok(());
266                }
267            }
268        }
269    }
270}
271
272impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        f.debug_struct("EthPubSub").finish_non_exhaustive()
275    }
276}
277
278/// Container type `EthPubSub`
279#[derive(Clone)]
280struct EthPubSubInner<EthApi> {
281    /// The `eth` API.
282    eth_api: EthApi,
283    /// The type that's used to spawn subscription tasks.
284    subscription_task_spawner: Runtime,
285}
286
287// == impl EthPubSubInner ===
288
289impl<Eth> EthPubSubInner<Eth>
290where
291    Eth: RpcNodeCore<Provider: BlockNumReader>,
292{
293    /// Returns the current sync status for the `syncing` subscription
294    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
295        if is_syncing {
296            let current_block = self
297                .eth_api
298                .provider()
299                .chain_info()
300                .map(|info| info.best_number)
301                .unwrap_or_default();
302            PubSubSyncStatus::Detailed(SyncStatusMetadata {
303                syncing: true,
304                starting_block: 0,
305                current_block,
306                highest_block: Some(current_block),
307            })
308        } else {
309            PubSubSyncStatus::Simple(false)
310        }
311    }
312}
313
314impl<Eth> EthPubSubInner<Eth>
315where
316    Eth: RpcNodeCore<Pool: TransactionPool>,
317{
318    /// Returns a stream that yields all transaction hashes emitted by the txpool.
319    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
320        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
321    }
322
323    /// Returns a stream that yields all transactions emitted by the txpool.
324    fn full_pending_transaction_stream(
325        &self,
326    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
327        self.eth_api.pool().new_pending_pool_transactions_listener()
328    }
329}
330
331impl<Eth> EthPubSubInner<Eth>
332where
333    Eth: EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>> + RpcNodeCore,
334{
335    /// Returns a stream that yields all new RPC blocks.
336    fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
337        let converter = self.eth_api.converter();
338        self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
339            let headers = new_chain
340                .committed()
341                .blocks_iter()
342                .filter_map(|block| {
343                    match converter.convert_header(block.clone_sealed_header(), block.rlp_length())
344                    {
345                        Ok(header) => Some(header),
346                        Err(err) => {
347                            error!(target = "rpc", %err, "Failed to convert header");
348                            None
349                        }
350                    }
351                })
352                .collect::<Vec<_>>();
353            futures::stream::iter(headers)
354        })
355    }
356
357    /// Returns a stream that yields all logs that match the given filter.
358    fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
359        BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
360            .map(move |canon_state| {
361                canon_state.expect("new block subscription never ends").block_receipts()
362            })
363            .flat_map(futures::stream::iter)
364            .flat_map(move |(block_receipts, removed)| {
365                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
366                    &filter,
367                    block_receipts.block,
368                    block_receipts.timestamp,
369                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
370                    removed,
371                );
372                futures::stream::iter(all_logs)
373            })
374    }
375}