reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_primitives::{TxHash, U256};
6use alloy_rpc_types_eth::{
7    pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8    Filter, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_primitives_traits::NodePrimitives;
17use reth_rpc_eth_api::{
18    pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27    wrappers::{BroadcastStream, ReceiverStream},
28    Stream,
29};
30use tracing::error;
31
32/// `Eth` pubsub RPC implementation.
33///
34/// This handles `eth_subscribe` RPC calls.
35#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37    /// All nested fields bundled together.
38    inner: Arc<EthPubSubInner<Eth>>,
39}
40
41// === impl EthPubSub ===
42
43impl<Eth> EthPubSub<Eth> {
44    /// Creates a new, shareable instance.
45    ///
46    /// Subscription tasks are spawned via [`tokio::task::spawn`]
47    pub fn new(eth_api: Eth) -> Self {
48        Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
49    }
50
51    /// Creates a new, shareable instance.
52    pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
53        let inner = EthPubSubInner { eth_api, subscription_task_spawner };
54        Self { inner: Arc::new(inner) }
55    }
56}
57
58impl<N: NodePrimitives, Eth> EthPubSub<Eth>
59where
60    Eth: RpcNodeCore<
61            Provider: BlockNumReader + CanonStateSubscriptions<Primitives = N>,
62            Pool: TransactionPool,
63            Network: NetworkInfo,
64        > + EthApiTypes<
65            RpcConvert: RpcConvert<
66                Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Eth::Pool>>,
67            >,
68        >,
69{
70    /// Returns the current sync status for the `syncing` subscription
71    pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
72        self.inner.sync_status(is_syncing)
73    }
74
75    /// Returns a stream that yields all transaction hashes emitted by the txpool.
76    pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
77        self.inner.pending_transaction_hashes_stream()
78    }
79
80    /// Returns a stream that yields all transactions emitted by the txpool.
81    pub fn full_pending_transaction_stream(
82        &self,
83    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
84        self.inner.full_pending_transaction_stream()
85    }
86
87    /// Returns a stream that yields all new RPC blocks.
88    pub fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
89        self.inner.new_headers_stream()
90    }
91
92    /// Returns a stream that yields all logs that match the given filter.
93    pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
94        self.inner.log_stream(filter)
95    }
96
97    /// The actual handler for an accepted [`EthPubSub::subscribe`] call.
98    pub async fn handle_accepted(
99        &self,
100        accepted_sink: SubscriptionSink,
101        kind: SubscriptionKind,
102        params: Option<Params>,
103    ) -> Result<(), ErrorObject<'static>> {
104        #[allow(unreachable_patterns)]
105        match kind {
106            SubscriptionKind::NewHeads => {
107                pipe_from_stream(accepted_sink, self.new_headers_stream()).await
108            }
109            SubscriptionKind::Logs => {
110                // if no params are provided, used default filter params
111                let filter = match params {
112                    Some(Params::Logs(filter)) => *filter,
113                    Some(Params::Bool(_)) => {
114                        return Err(invalid_params_rpc_err("Invalid params for logs"))
115                    }
116                    _ => Default::default(),
117                };
118                pipe_from_stream(accepted_sink, self.log_stream(filter)).await
119            }
120            SubscriptionKind::NewPendingTransactions => {
121                if let Some(params) = params {
122                    match params {
123                        Params::Bool(true) => {
124                            // full transaction objects requested
125                            let stream = self.full_pending_transaction_stream().filter_map(|tx| {
126                                let tx_value = match self
127                                    .inner
128                                    .eth_api
129                                    .tx_resp_builder()
130                                    .fill_pending(tx.transaction.to_consensus())
131                                {
132                                    Ok(tx) => Some(tx),
133                                    Err(err) => {
134                                        error!(target = "rpc",
135                                            %err,
136                                            "Failed to fill transaction with block context"
137                                        );
138                                        None
139                                    }
140                                };
141                                std::future::ready(tx_value)
142                            });
143                            return pipe_from_stream(accepted_sink, stream).await
144                        }
145                        Params::Bool(false) | Params::None => {
146                            // only hashes requested
147                        }
148                        Params::Logs(_) => {
149                            return Err(invalid_params_rpc_err(
150                                "Invalid params for newPendingTransactions",
151                            ))
152                        }
153                    }
154                }
155
156                pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
157            }
158            SubscriptionKind::Syncing => {
159                // get new block subscription
160                let mut canon_state = BroadcastStream::new(
161                    self.inner.eth_api.provider().subscribe_to_canonical_state(),
162                );
163                // get current sync status
164                let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
165                let current_sub_res = self.sync_status(initial_sync_status);
166
167                // send the current status immediately
168                let msg = SubscriptionMessage::new(
169                    accepted_sink.method_name(),
170                    accepted_sink.subscription_id(),
171                    &current_sub_res,
172                )
173                .map_err(SubscriptionSerializeError::new)?;
174
175                if accepted_sink.send(msg).await.is_err() {
176                    return Ok(())
177                }
178
179                while canon_state.next().await.is_some() {
180                    let current_syncing = self.inner.eth_api.network().is_syncing();
181                    // Only send a new response if the sync status has changed
182                    if current_syncing != initial_sync_status {
183                        // Update the sync status on each new block
184                        initial_sync_status = current_syncing;
185
186                        // send a new message now that the status changed
187                        let sync_status = self.sync_status(current_syncing);
188                        let msg = SubscriptionMessage::new(
189                            accepted_sink.method_name(),
190                            accepted_sink.subscription_id(),
191                            &sync_status,
192                        )
193                        .map_err(SubscriptionSerializeError::new)?;
194
195                        if accepted_sink.send(msg).await.is_err() {
196                            break
197                        }
198                    }
199                }
200
201                Ok(())
202            }
203            _ => {
204                // TODO: implement once https://github.com/alloy-rs/alloy/pull/2974 is released
205                Err(invalid_params_rpc_err("Unsupported subscription kind"))
206            }
207        }
208    }
209}
210
211#[async_trait::async_trait]
212impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
213where
214    Eth: RpcNodeCore<
215            Provider: BlockNumReader + CanonStateSubscriptions,
216            Pool: TransactionPool,
217            Network: NetworkInfo,
218        > + EthApiTypes<
219            RpcConvert: RpcConvert<
220                Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Eth::Pool>>,
221            >,
222        > + 'static,
223{
224    /// Handler for `eth_subscribe`
225    async fn subscribe(
226        &self,
227        pending: PendingSubscriptionSink,
228        kind: SubscriptionKind,
229        params: Option<Params>,
230    ) -> jsonrpsee::core::SubscriptionResult {
231        let sink = pending.accept().await?;
232        let pubsub = self.clone();
233        self.inner.subscription_task_spawner.spawn(Box::pin(async move {
234            let _ = pubsub.handle_accepted(sink, kind, params).await;
235        }));
236
237        Ok(())
238    }
239}
240
241/// Helper to convert a serde error into an [`ErrorObject`]
242#[derive(Debug, thiserror::Error)]
243#[error("Failed to serialize subscription item: {0}")]
244pub struct SubscriptionSerializeError(#[from] serde_json::Error);
245
246impl SubscriptionSerializeError {
247    const fn new(err: serde_json::Error) -> Self {
248        Self(err)
249    }
250}
251
252impl From<SubscriptionSerializeError> for ErrorObject<'static> {
253    fn from(value: SubscriptionSerializeError) -> Self {
254        internal_rpc_err(value.to_string())
255    }
256}
257
258/// Pipes all stream items to the subscription sink.
259async fn pipe_from_stream<T, St>(
260    sink: SubscriptionSink,
261    mut stream: St,
262) -> Result<(), ErrorObject<'static>>
263where
264    St: Stream<Item = T> + Unpin,
265    T: Serialize,
266{
267    loop {
268        tokio::select! {
269            _ = sink.closed() => {
270                // connection dropped
271                break Ok(())
272            },
273            maybe_item = stream.next() => {
274                let item = match maybe_item {
275                    Some(item) => item,
276                    None => {
277                        // stream ended
278                        break  Ok(())
279                    },
280                };
281                let msg = SubscriptionMessage::new(
282                    sink.method_name(),
283                    sink.subscription_id(),
284                    &item
285                ).map_err(SubscriptionSerializeError::new)?;
286
287                if sink.send(msg).await.is_err() {
288                    break Ok(());
289                }
290            }
291        }
292    }
293}
294
295impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.debug_struct("EthPubSub").finish_non_exhaustive()
298    }
299}
300
301/// Container type `EthPubSub`
302#[derive(Clone)]
303struct EthPubSubInner<EthApi> {
304    /// The `eth` API.
305    eth_api: EthApi,
306    /// The type that's used to spawn subscription tasks.
307    subscription_task_spawner: Box<dyn TaskSpawner>,
308}
309
310// == impl EthPubSubInner ===
311
312impl<Eth> EthPubSubInner<Eth>
313where
314    Eth: RpcNodeCore<Provider: BlockNumReader>,
315{
316    /// Returns the current sync status for the `syncing` subscription
317    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
318        if is_syncing {
319            let current_block = self
320                .eth_api
321                .provider()
322                .chain_info()
323                .map(|info| info.best_number)
324                .unwrap_or_default();
325            PubSubSyncStatus::Detailed(SyncStatusMetadata {
326                syncing: true,
327                starting_block: 0,
328                current_block,
329                highest_block: Some(current_block),
330            })
331        } else {
332            PubSubSyncStatus::Simple(false)
333        }
334    }
335}
336
337impl<Eth> EthPubSubInner<Eth>
338where
339    Eth: RpcNodeCore<Pool: TransactionPool>,
340{
341    /// Returns a stream that yields all transaction hashes emitted by the txpool.
342    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
343        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
344    }
345
346    /// Returns a stream that yields all transactions emitted by the txpool.
347    fn full_pending_transaction_stream(
348        &self,
349    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
350        self.eth_api.pool().new_pending_pool_transactions_listener()
351    }
352}
353
354impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
355where
356    Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
357{
358    /// Returns a stream that yields all new RPC blocks.
359    fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
360        self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
361            let headers = new_chain
362                .committed()
363                .blocks_iter()
364                .map(|block| {
365                    Header::from_consensus(
366                        block.clone_sealed_header().into(),
367                        None,
368                        Some(U256::from(block.rlp_length())),
369                    )
370                })
371                .collect::<Vec<_>>();
372            futures::stream::iter(headers)
373        })
374    }
375
376    /// Returns a stream that yields all logs that match the given filter.
377    fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
378        BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
379            .map(move |canon_state| {
380                canon_state.expect("new block subscription never ends").block_receipts()
381            })
382            .flat_map(futures::stream::iter)
383            .flat_map(move |(block_receipts, removed)| {
384                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
385                    &filter,
386                    block_receipts.block,
387                    block_receipts.timestamp,
388                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
389                    removed,
390                );
391                futures::stream::iter(all_logs)
392            })
393    }
394}