reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7    pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8    FilteredParams, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_primitives_traits::NodePrimitives;
17use reth_rpc_eth_api::{
18    pubsub::EthPubSubApiServer, EthApiTypes, RpcNodeCore, RpcTransaction, TransactionCompat,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27    wrappers::{BroadcastStream, ReceiverStream},
28    Stream,
29};
30use tracing::error;
31
32/// `Eth` pubsub RPC implementation.
33///
34/// This handles `eth_subscribe` RPC calls.
35#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37    /// All nested fields bundled together.
38    inner: Arc<EthPubSubInner<Eth>>,
39    /// The type that's used to spawn subscription tasks.
40    subscription_task_spawner: Box<dyn TaskSpawner>,
41}
42
43// === impl EthPubSub ===
44
45impl<Eth> EthPubSub<Eth> {
46    /// Creates a new, shareable instance.
47    ///
48    /// Subscription tasks are spawned via [`tokio::task::spawn`]
49    pub fn new(eth_api: Eth) -> Self {
50        Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
51    }
52
53    /// Creates a new, shareable instance.
54    pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
55        let inner = EthPubSubInner { eth_api };
56        Self { inner: Arc::new(inner), subscription_task_spawner }
57    }
58}
59
60#[async_trait::async_trait]
61impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
62where
63    Eth: RpcNodeCore<
64            Provider: BlockNumReader + CanonStateSubscriptions,
65            Pool: TransactionPool,
66            Network: NetworkInfo,
67        > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
68        + 'static,
69{
70    /// Handler for `eth_subscribe`
71    async fn subscribe(
72        &self,
73        pending: PendingSubscriptionSink,
74        kind: SubscriptionKind,
75        params: Option<Params>,
76    ) -> jsonrpsee::core::SubscriptionResult {
77        let sink = pending.accept().await?;
78        let pubsub = self.inner.clone();
79        self.subscription_task_spawner.spawn(Box::pin(async move {
80            let _ = handle_accepted(pubsub, sink, kind, params).await;
81        }));
82
83        Ok(())
84    }
85}
86
87/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
88async fn handle_accepted<Eth>(
89    pubsub: Arc<EthPubSubInner<Eth>>,
90    accepted_sink: SubscriptionSink,
91    kind: SubscriptionKind,
92    params: Option<Params>,
93) -> Result<(), ErrorObject<'static>>
94where
95    Eth: RpcNodeCore<
96            Provider: BlockNumReader + CanonStateSubscriptions,
97            Pool: TransactionPool,
98            Network: NetworkInfo,
99        > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
100{
101    match kind {
102        SubscriptionKind::NewHeads => {
103            pipe_from_stream(accepted_sink, pubsub.new_headers_stream()).await
104        }
105        SubscriptionKind::Logs => {
106            // if no params are provided, used default filter params
107            let filter = match params {
108                Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
109                Some(Params::Bool(_)) => {
110                    return Err(invalid_params_rpc_err("Invalid params for logs"))
111                }
112                _ => FilteredParams::default(),
113            };
114            pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
115        }
116        SubscriptionKind::NewPendingTransactions => {
117            if let Some(params) = params {
118                match params {
119                    Params::Bool(true) => {
120                        // full transaction objects requested
121                        let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
122                            let tx_value = match pubsub
123                                .eth_api
124                                .tx_resp_builder()
125                                .fill_pending(tx.transaction.to_consensus())
126                            {
127                                Ok(tx) => Some(tx),
128                                Err(err) => {
129                                    error!(target = "rpc",
130                                        %err,
131                                        "Failed to fill transaction with block context"
132                                    );
133                                    None
134                                }
135                            };
136                            std::future::ready(tx_value)
137                        });
138                        return pipe_from_stream(accepted_sink, stream).await
139                    }
140                    Params::Bool(false) | Params::None => {
141                        // only hashes requested
142                    }
143                    Params::Logs(_) => {
144                        return Err(invalid_params_rpc_err(
145                            "Invalid params for newPendingTransactions",
146                        ))
147                    }
148                }
149            }
150
151            pipe_from_stream(accepted_sink, pubsub.pending_transaction_hashes_stream()).await
152        }
153        SubscriptionKind::Syncing => {
154            // get new block subscription
155            let mut canon_state =
156                BroadcastStream::new(pubsub.eth_api.provider().subscribe_to_canonical_state());
157            // get current sync status
158            let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
159            let current_sub_res = pubsub.sync_status(initial_sync_status);
160
161            // send the current status immediately
162            let msg = SubscriptionMessage::from_json(&current_sub_res)
163                .map_err(SubscriptionSerializeError::new)?;
164            if accepted_sink.send(msg).await.is_err() {
165                return Ok(())
166            }
167
168            while canon_state.next().await.is_some() {
169                let current_syncing = pubsub.eth_api.network().is_syncing();
170                // Only send a new response if the sync status has changed
171                if current_syncing != initial_sync_status {
172                    // Update the sync status on each new block
173                    initial_sync_status = current_syncing;
174
175                    // send a new message now that the status changed
176                    let sync_status = pubsub.sync_status(current_syncing);
177                    let msg = SubscriptionMessage::from_json(&sync_status)
178                        .map_err(SubscriptionSerializeError::new)?;
179                    if accepted_sink.send(msg).await.is_err() {
180                        break
181                    }
182                }
183            }
184
185            Ok(())
186        }
187    }
188}
189
190/// Helper to convert a serde error into an [`ErrorObject`]
191#[derive(Debug, thiserror::Error)]
192#[error("Failed to serialize subscription item: {0}")]
193pub struct SubscriptionSerializeError(#[from] serde_json::Error);
194
195impl SubscriptionSerializeError {
196    const fn new(err: serde_json::Error) -> Self {
197        Self(err)
198    }
199}
200
201impl From<SubscriptionSerializeError> for ErrorObject<'static> {
202    fn from(value: SubscriptionSerializeError) -> Self {
203        internal_rpc_err(value.to_string())
204    }
205}
206
207/// Pipes all stream items to the subscription sink.
208async fn pipe_from_stream<T, St>(
209    sink: SubscriptionSink,
210    mut stream: St,
211) -> Result<(), ErrorObject<'static>>
212where
213    St: Stream<Item = T> + Unpin,
214    T: Serialize,
215{
216    loop {
217        tokio::select! {
218            _ = sink.closed() => {
219                // connection dropped
220                break Ok(())
221            },
222            maybe_item = stream.next() => {
223                let item = match maybe_item {
224                    Some(item) => item,
225                    None => {
226                        // stream ended
227                        break  Ok(())
228                    },
229                };
230                let msg = SubscriptionMessage::from_json(&item).map_err(SubscriptionSerializeError::new)?;
231                if sink.send(msg).await.is_err() {
232                    break Ok(());
233                }
234            }
235        }
236    }
237}
238
239impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("EthPubSub").finish_non_exhaustive()
242    }
243}
244
245/// Container type `EthPubSub`
246#[derive(Clone)]
247struct EthPubSubInner<EthApi> {
248    /// The `eth` API.
249    eth_api: EthApi,
250}
251
252// == impl EthPubSubInner ===
253
254impl<Eth> EthPubSubInner<Eth>
255where
256    Eth: RpcNodeCore<Provider: BlockNumReader>,
257{
258    /// Returns the current sync status for the `syncing` subscription
259    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
260        if is_syncing {
261            let current_block = self
262                .eth_api
263                .provider()
264                .chain_info()
265                .map(|info| info.best_number)
266                .unwrap_or_default();
267            PubSubSyncStatus::Detailed(SyncStatusMetadata {
268                syncing: true,
269                starting_block: 0,
270                current_block,
271                highest_block: Some(current_block),
272            })
273        } else {
274            PubSubSyncStatus::Simple(false)
275        }
276    }
277}
278
279impl<Eth> EthPubSubInner<Eth>
280where
281    Eth: RpcNodeCore<Pool: TransactionPool>,
282{
283    /// Returns a stream that yields all transaction hashes emitted by the txpool.
284    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
285        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
286    }
287
288    /// Returns a stream that yields all transactions emitted by the txpool.
289    fn full_pending_transaction_stream(
290        &self,
291    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
292        self.eth_api.pool().new_pending_pool_transactions_listener()
293    }
294}
295
296impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
297where
298    Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
299{
300    /// Returns a stream that yields all new RPC blocks.
301    fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
302        self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
303            let headers = new_chain.committed().headers().collect::<Vec<_>>();
304            futures::stream::iter(
305                headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
306            )
307        })
308    }
309
310    /// Returns a stream that yields all logs that match the given filter.
311    fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
312        BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
313            .map(move |canon_state| {
314                canon_state.expect("new block subscription never ends").block_receipts()
315            })
316            .flat_map(futures::stream::iter)
317            .flat_map(move |(block_receipts, removed)| {
318                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
319                    &filter,
320                    block_receipts.block,
321                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
322                    removed,
323                );
324                futures::stream::iter(all_logs)
325            })
326    }
327}