reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7    pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8    Filter, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_rpc_convert::RpcHeader;
17use reth_rpc_eth_api::{
18    pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27    wrappers::{BroadcastStream, ReceiverStream},
28    Stream,
29};
30use tracing::error;
31
32/// `Eth` pubsub RPC implementation.
33///
34/// This handles `eth_subscribe` RPC calls.
35#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37    /// All nested fields bundled together.
38    inner: Arc<EthPubSubInner<Eth>>,
39}
40
41// === impl EthPubSub ===
42
43impl<Eth> EthPubSub<Eth> {
44    /// Creates a new, shareable instance.
45    ///
46    /// Subscription tasks are spawned via [`tokio::task::spawn`]
47    pub fn new(eth_api: Eth) -> Self {
48        Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
49    }
50
51    /// Creates a new, shareable instance.
52    pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
53        let inner = EthPubSubInner { eth_api, subscription_task_spawner };
54        Self { inner: Arc::new(inner) }
55    }
56}
57
58impl<Eth> EthPubSub<Eth>
59where
60    Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
61{
62    /// Returns the current sync status for the `syncing` subscription
63    pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
64        self.inner.sync_status(is_syncing)
65    }
66
67    /// Returns a stream that yields all transaction hashes emitted by the txpool.
68    pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
69        self.inner.pending_transaction_hashes_stream()
70    }
71
72    /// Returns a stream that yields all transactions emitted by the txpool.
73    pub fn full_pending_transaction_stream(
74        &self,
75    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
76        self.inner.full_pending_transaction_stream()
77    }
78
79    /// Returns a stream that yields all new RPC blocks.
80    pub fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
81        self.inner.new_headers_stream()
82    }
83
84    /// Returns a stream that yields all logs that match the given filter.
85    pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
86        self.inner.log_stream(filter)
87    }
88
89    /// The actual handler for an accepted [`EthPubSub::subscribe`] call.
90    pub async fn handle_accepted(
91        &self,
92        accepted_sink: SubscriptionSink,
93        kind: SubscriptionKind,
94        params: Option<Params>,
95    ) -> Result<(), ErrorObject<'static>> {
96        #[allow(unreachable_patterns)]
97        match kind {
98            SubscriptionKind::NewHeads => {
99                pipe_from_stream(accepted_sink, self.new_headers_stream()).await
100            }
101            SubscriptionKind::Logs => {
102                // if no params are provided, used default filter params
103                let filter = match params {
104                    Some(Params::Logs(filter)) => *filter,
105                    Some(Params::Bool(_)) => {
106                        return Err(invalid_params_rpc_err("Invalid params for logs"))
107                    }
108                    _ => Default::default(),
109                };
110                pipe_from_stream(accepted_sink, self.log_stream(filter)).await
111            }
112            SubscriptionKind::NewPendingTransactions => {
113                if let Some(params) = params {
114                    match params {
115                        Params::Bool(true) => {
116                            // full transaction objects requested
117                            let stream = self.full_pending_transaction_stream().filter_map(|tx| {
118                                let tx_value = match self
119                                    .inner
120                                    .eth_api
121                                    .converter()
122                                    .fill_pending(tx.transaction.to_consensus())
123                                {
124                                    Ok(tx) => Some(tx),
125                                    Err(err) => {
126                                        error!(target = "rpc",
127                                            %err,
128                                            "Failed to fill transaction with block context"
129                                        );
130                                        None
131                                    }
132                                };
133                                std::future::ready(tx_value)
134                            });
135                            return pipe_from_stream(accepted_sink, stream).await
136                        }
137                        Params::Bool(false) | Params::None => {
138                            // only hashes requested
139                        }
140                        Params::Logs(_) => {
141                            return Err(invalid_params_rpc_err(
142                                "Invalid params for newPendingTransactions",
143                            ))
144                        }
145                    }
146                }
147
148                pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
149            }
150            SubscriptionKind::Syncing => {
151                // get new block subscription
152                let mut canon_state = BroadcastStream::new(
153                    self.inner.eth_api.provider().subscribe_to_canonical_state(),
154                );
155                // get current sync status
156                let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
157                let current_sub_res = self.sync_status(initial_sync_status);
158
159                // send the current status immediately
160                let msg = SubscriptionMessage::new(
161                    accepted_sink.method_name(),
162                    accepted_sink.subscription_id(),
163                    &current_sub_res,
164                )
165                .map_err(SubscriptionSerializeError::new)?;
166
167                if accepted_sink.send(msg).await.is_err() {
168                    return Ok(())
169                }
170
171                while canon_state.next().await.is_some() {
172                    let current_syncing = self.inner.eth_api.network().is_syncing();
173                    // Only send a new response if the sync status has changed
174                    if current_syncing != initial_sync_status {
175                        // Update the sync status on each new block
176                        initial_sync_status = current_syncing;
177
178                        // send a new message now that the status changed
179                        let sync_status = self.sync_status(current_syncing);
180                        let msg = SubscriptionMessage::new(
181                            accepted_sink.method_name(),
182                            accepted_sink.subscription_id(),
183                            &sync_status,
184                        )
185                        .map_err(SubscriptionSerializeError::new)?;
186
187                        if accepted_sink.send(msg).await.is_err() {
188                            break
189                        }
190                    }
191                }
192
193                Ok(())
194            }
195            _ => {
196                // TODO: implement once https://github.com/alloy-rs/alloy/pull/2974 is released
197                Err(invalid_params_rpc_err("Unsupported subscription kind"))
198            }
199        }
200    }
201}
202
203#[async_trait::async_trait]
204impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
205where
206    Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
207{
208    /// Handler for `eth_subscribe`
209    async fn subscribe(
210        &self,
211        pending: PendingSubscriptionSink,
212        kind: SubscriptionKind,
213        params: Option<Params>,
214    ) -> jsonrpsee::core::SubscriptionResult {
215        let sink = pending.accept().await?;
216        let pubsub = self.clone();
217        self.inner.subscription_task_spawner.spawn(Box::pin(async move {
218            let _ = pubsub.handle_accepted(sink, kind, params).await;
219        }));
220
221        Ok(())
222    }
223}
224
225/// Helper to convert a serde error into an [`ErrorObject`]
226#[derive(Debug, thiserror::Error)]
227#[error("Failed to serialize subscription item: {0}")]
228pub struct SubscriptionSerializeError(#[from] serde_json::Error);
229
230impl SubscriptionSerializeError {
231    const fn new(err: serde_json::Error) -> Self {
232        Self(err)
233    }
234}
235
236impl From<SubscriptionSerializeError> for ErrorObject<'static> {
237    fn from(value: SubscriptionSerializeError) -> Self {
238        internal_rpc_err(value.to_string())
239    }
240}
241
242/// Pipes all stream items to the subscription sink.
243async fn pipe_from_stream<T, St>(
244    sink: SubscriptionSink,
245    mut stream: St,
246) -> Result<(), ErrorObject<'static>>
247where
248    St: Stream<Item = T> + Unpin,
249    T: Serialize,
250{
251    loop {
252        tokio::select! {
253            _ = sink.closed() => {
254                // connection dropped
255                break Ok(())
256            },
257            maybe_item = stream.next() => {
258                let item = match maybe_item {
259                    Some(item) => item,
260                    None => {
261                        // stream ended
262                        break  Ok(())
263                    },
264                };
265                let msg = SubscriptionMessage::new(
266                    sink.method_name(),
267                    sink.subscription_id(),
268                    &item
269                ).map_err(SubscriptionSerializeError::new)?;
270
271                if sink.send(msg).await.is_err() {
272                    break Ok(());
273                }
274            }
275        }
276    }
277}
278
279impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281        f.debug_struct("EthPubSub").finish_non_exhaustive()
282    }
283}
284
285/// Container type `EthPubSub`
286#[derive(Clone)]
287struct EthPubSubInner<EthApi> {
288    /// The `eth` API.
289    eth_api: EthApi,
290    /// The type that's used to spawn subscription tasks.
291    subscription_task_spawner: Box<dyn TaskSpawner>,
292}
293
294// == impl EthPubSubInner ===
295
296impl<Eth> EthPubSubInner<Eth>
297where
298    Eth: RpcNodeCore<Provider: BlockNumReader>,
299{
300    /// Returns the current sync status for the `syncing` subscription
301    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
302        if is_syncing {
303            let current_block = self
304                .eth_api
305                .provider()
306                .chain_info()
307                .map(|info| info.best_number)
308                .unwrap_or_default();
309            PubSubSyncStatus::Detailed(SyncStatusMetadata {
310                syncing: true,
311                starting_block: 0,
312                current_block,
313                highest_block: Some(current_block),
314            })
315        } else {
316            PubSubSyncStatus::Simple(false)
317        }
318    }
319}
320
321impl<Eth> EthPubSubInner<Eth>
322where
323    Eth: RpcNodeCore<Pool: TransactionPool>,
324{
325    /// Returns a stream that yields all transaction hashes emitted by the txpool.
326    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
327        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
328    }
329
330    /// Returns a stream that yields all transactions emitted by the txpool.
331    fn full_pending_transaction_stream(
332        &self,
333    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
334        self.eth_api.pool().new_pending_pool_transactions_listener()
335    }
336}
337
338impl<Eth> EthPubSubInner<Eth>
339where
340    Eth: EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>> + RpcNodeCore,
341{
342    /// Returns a stream that yields all new RPC blocks.
343    fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
344        let converter = self.eth_api.converter();
345        self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
346            let headers = new_chain
347                .committed()
348                .blocks_iter()
349                .filter_map(|block| {
350                    match converter.convert_header(block.clone_sealed_header(), block.rlp_length())
351                    {
352                        Ok(header) => Some(header),
353                        Err(err) => {
354                            error!(target = "rpc", %err, "Failed to convert header");
355                            None
356                        }
357                    }
358                })
359                .collect::<Vec<_>>();
360            futures::stream::iter(headers)
361        })
362    }
363
364    /// Returns a stream that yields all logs that match the given filter.
365    fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
366        BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
367            .map(move |canon_state| {
368                canon_state.expect("new block subscription never ends").block_receipts()
369            })
370            .flat_map(futures::stream::iter)
371            .flat_map(move |(block_receipts, removed)| {
372                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
373                    &filter,
374                    block_receipts.block,
375                    block_receipts.timestamp,
376                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
377                    removed,
378                );
379                futures::stream::iter(all_logs)
380            })
381    }
382}