reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_primitives::{TxHash, U256};
6use alloy_rpc_types_eth::{
7    pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8    Filter, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_primitives_traits::NodePrimitives;
17use reth_rpc_eth_api::{
18    pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27    wrappers::{BroadcastStream, ReceiverStream},
28    Stream,
29};
30use tracing::error;
31
32/// `Eth` pubsub RPC implementation.
33///
34/// This handles `eth_subscribe` RPC calls.
35#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37    /// All nested fields bundled together.
38    inner: Arc<EthPubSubInner<Eth>>,
39}
40
41// === impl EthPubSub ===
42
43impl<Eth> EthPubSub<Eth> {
44    /// Creates a new, shareable instance.
45    ///
46    /// Subscription tasks are spawned via [`tokio::task::spawn`]
47    pub fn new(eth_api: Eth) -> Self {
48        Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
49    }
50
51    /// Creates a new, shareable instance.
52    pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
53        let inner = EthPubSubInner { eth_api, subscription_task_spawner };
54        Self { inner: Arc::new(inner) }
55    }
56}
57
58impl<N: NodePrimitives, Eth> EthPubSub<Eth>
59where
60    Eth: RpcNodeCore<
61            Provider: BlockNumReader + CanonStateSubscriptions<Primitives = N>,
62            Pool: TransactionPool,
63            Network: NetworkInfo,
64        > + EthApiTypes<
65            RpcConvert: RpcConvert<
66                Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Eth::Pool>>,
67            >,
68        >,
69{
70    /// Returns the current sync status for the `syncing` subscription
71    pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
72        self.inner.sync_status(is_syncing)
73    }
74
75    /// Returns a stream that yields all transaction hashes emitted by the txpool.
76    pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
77        self.inner.pending_transaction_hashes_stream()
78    }
79
80    /// Returns a stream that yields all transactions emitted by the txpool.
81    pub fn full_pending_transaction_stream(
82        &self,
83    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
84        self.inner.full_pending_transaction_stream()
85    }
86
87    /// Returns a stream that yields all new RPC blocks.
88    pub fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
89        self.inner.new_headers_stream()
90    }
91
92    /// Returns a stream that yields all logs that match the given filter.
93    pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
94        self.inner.log_stream(filter)
95    }
96
97    /// The actual handler for an accepted [`EthPubSub::subscribe`] call.
98    pub async fn handle_accepted(
99        &self,
100        accepted_sink: SubscriptionSink,
101        kind: SubscriptionKind,
102        params: Option<Params>,
103    ) -> Result<(), ErrorObject<'static>> {
104        match kind {
105            SubscriptionKind::NewHeads => {
106                pipe_from_stream(accepted_sink, self.new_headers_stream()).await
107            }
108            SubscriptionKind::Logs => {
109                // if no params are provided, used default filter params
110                let filter = match params {
111                    Some(Params::Logs(filter)) => *filter,
112                    Some(Params::Bool(_)) => {
113                        return Err(invalid_params_rpc_err("Invalid params for logs"))
114                    }
115                    _ => Default::default(),
116                };
117                pipe_from_stream(accepted_sink, self.log_stream(filter)).await
118            }
119            SubscriptionKind::NewPendingTransactions => {
120                if let Some(params) = params {
121                    match params {
122                        Params::Bool(true) => {
123                            // full transaction objects requested
124                            let stream = self.full_pending_transaction_stream().filter_map(|tx| {
125                                let tx_value = match self
126                                    .inner
127                                    .eth_api
128                                    .tx_resp_builder()
129                                    .fill_pending(tx.transaction.to_consensus())
130                                {
131                                    Ok(tx) => Some(tx),
132                                    Err(err) => {
133                                        error!(target = "rpc",
134                                            %err,
135                                            "Failed to fill transaction with block context"
136                                        );
137                                        None
138                                    }
139                                };
140                                std::future::ready(tx_value)
141                            });
142                            return pipe_from_stream(accepted_sink, stream).await
143                        }
144                        Params::Bool(false) | Params::None => {
145                            // only hashes requested
146                        }
147                        Params::Logs(_) => {
148                            return Err(invalid_params_rpc_err(
149                                "Invalid params for newPendingTransactions",
150                            ))
151                        }
152                    }
153                }
154
155                pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
156            }
157            SubscriptionKind::Syncing => {
158                // get new block subscription
159                let mut canon_state = BroadcastStream::new(
160                    self.inner.eth_api.provider().subscribe_to_canonical_state(),
161                );
162                // get current sync status
163                let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
164                let current_sub_res = self.sync_status(initial_sync_status);
165
166                // send the current status immediately
167                let msg = SubscriptionMessage::new(
168                    accepted_sink.method_name(),
169                    accepted_sink.subscription_id(),
170                    &current_sub_res,
171                )
172                .map_err(SubscriptionSerializeError::new)?;
173
174                if accepted_sink.send(msg).await.is_err() {
175                    return Ok(())
176                }
177
178                while canon_state.next().await.is_some() {
179                    let current_syncing = self.inner.eth_api.network().is_syncing();
180                    // Only send a new response if the sync status has changed
181                    if current_syncing != initial_sync_status {
182                        // Update the sync status on each new block
183                        initial_sync_status = current_syncing;
184
185                        // send a new message now that the status changed
186                        let sync_status = self.sync_status(current_syncing);
187                        let msg = SubscriptionMessage::new(
188                            accepted_sink.method_name(),
189                            accepted_sink.subscription_id(),
190                            &sync_status,
191                        )
192                        .map_err(SubscriptionSerializeError::new)?;
193
194                        if accepted_sink.send(msg).await.is_err() {
195                            break
196                        }
197                    }
198                }
199
200                Ok(())
201            }
202        }
203    }
204}
205
206#[async_trait::async_trait]
207impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
208where
209    Eth: RpcNodeCore<
210            Provider: BlockNumReader + CanonStateSubscriptions,
211            Pool: TransactionPool,
212            Network: NetworkInfo,
213        > + EthApiTypes<
214            RpcConvert: RpcConvert<
215                Primitives: NodePrimitives<SignedTx = PoolConsensusTx<Eth::Pool>>,
216            >,
217        > + 'static,
218{
219    /// Handler for `eth_subscribe`
220    async fn subscribe(
221        &self,
222        pending: PendingSubscriptionSink,
223        kind: SubscriptionKind,
224        params: Option<Params>,
225    ) -> jsonrpsee::core::SubscriptionResult {
226        let sink = pending.accept().await?;
227        let pubsub = self.clone();
228        self.inner.subscription_task_spawner.spawn(Box::pin(async move {
229            let _ = pubsub.handle_accepted(sink, kind, params).await;
230        }));
231
232        Ok(())
233    }
234}
235
236/// Helper to convert a serde error into an [`ErrorObject`]
237#[derive(Debug, thiserror::Error)]
238#[error("Failed to serialize subscription item: {0}")]
239pub struct SubscriptionSerializeError(#[from] serde_json::Error);
240
241impl SubscriptionSerializeError {
242    const fn new(err: serde_json::Error) -> Self {
243        Self(err)
244    }
245}
246
247impl From<SubscriptionSerializeError> for ErrorObject<'static> {
248    fn from(value: SubscriptionSerializeError) -> Self {
249        internal_rpc_err(value.to_string())
250    }
251}
252
253/// Pipes all stream items to the subscription sink.
254async fn pipe_from_stream<T, St>(
255    sink: SubscriptionSink,
256    mut stream: St,
257) -> Result<(), ErrorObject<'static>>
258where
259    St: Stream<Item = T> + Unpin,
260    T: Serialize,
261{
262    loop {
263        tokio::select! {
264            _ = sink.closed() => {
265                // connection dropped
266                break Ok(())
267            },
268            maybe_item = stream.next() => {
269                let item = match maybe_item {
270                    Some(item) => item,
271                    None => {
272                        // stream ended
273                        break  Ok(())
274                    },
275                };
276                let msg = SubscriptionMessage::new(
277                    sink.method_name(),
278                    sink.subscription_id(),
279                    &item
280                ).map_err(SubscriptionSerializeError::new)?;
281
282                if sink.send(msg).await.is_err() {
283                    break Ok(());
284                }
285            }
286        }
287    }
288}
289
290impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        f.debug_struct("EthPubSub").finish_non_exhaustive()
293    }
294}
295
296/// Container type `EthPubSub`
297#[derive(Clone)]
298struct EthPubSubInner<EthApi> {
299    /// The `eth` API.
300    eth_api: EthApi,
301    /// The type that's used to spawn subscription tasks.
302    subscription_task_spawner: Box<dyn TaskSpawner>,
303}
304
305// == impl EthPubSubInner ===
306
307impl<Eth> EthPubSubInner<Eth>
308where
309    Eth: RpcNodeCore<Provider: BlockNumReader>,
310{
311    /// Returns the current sync status for the `syncing` subscription
312    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
313        if is_syncing {
314            let current_block = self
315                .eth_api
316                .provider()
317                .chain_info()
318                .map(|info| info.best_number)
319                .unwrap_or_default();
320            PubSubSyncStatus::Detailed(SyncStatusMetadata {
321                syncing: true,
322                starting_block: 0,
323                current_block,
324                highest_block: Some(current_block),
325            })
326        } else {
327            PubSubSyncStatus::Simple(false)
328        }
329    }
330}
331
332impl<Eth> EthPubSubInner<Eth>
333where
334    Eth: RpcNodeCore<Pool: TransactionPool>,
335{
336    /// Returns a stream that yields all transaction hashes emitted by the txpool.
337    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
338        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
339    }
340
341    /// Returns a stream that yields all transactions emitted by the txpool.
342    fn full_pending_transaction_stream(
343        &self,
344    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
345        self.eth_api.pool().new_pending_pool_transactions_listener()
346    }
347}
348
349impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
350where
351    Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
352{
353    /// Returns a stream that yields all new RPC blocks.
354    fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
355        self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
356            let headers = new_chain
357                .committed()
358                .blocks_iter()
359                .map(|block| {
360                    Header::from_consensus(
361                        block.clone_sealed_header().into(),
362                        None,
363                        Some(U256::from(block.rlp_length())),
364                    )
365                })
366                .collect::<Vec<_>>();
367            futures::stream::iter(headers)
368        })
369    }
370
371    /// Returns a stream that yields all logs that match the given filter.
372    fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
373        BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
374            .map(move |canon_state| {
375                canon_state.expect("new block subscription never ends").block_receipts()
376            })
377            .flat_map(futures::stream::iter)
378            .flat_map(move |(block_receipts, removed)| {
379                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
380                    &filter,
381                    block_receipts.block,
382                    block_receipts.timestamp,
383                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
384                    removed,
385                );
386                futures::stream::iter(all_logs)
387            })
388    }
389}