Skip to main content

reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
6use alloy_primitives::TxHash;
7use alloy_rpc_types_eth::{
8    pubsub::{
9        Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams,
10    },
11    Filter, Log,
12};
13use futures::StreamExt;
14use jsonrpsee::{
15    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
16};
17use reth_chain_state::CanonStateSubscriptions;
18use reth_network_api::NetworkInfo;
19use reth_primitives_traits::TransactionMeta;
20use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcHeader};
21use reth_rpc_eth_api::{
22    pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
23};
24use reth_rpc_eth_types::logs_utils;
25use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
26use reth_storage_api::BlockNumReader;
27use reth_tasks::Runtime;
28use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
29use serde::Serialize;
30use tokio_stream::{
31    wrappers::{BroadcastStream, ReceiverStream},
32    Stream,
33};
34use tracing::error;
35
36/// `Eth` pubsub RPC implementation.
37///
38/// This handles `eth_subscribe` RPC calls.
39#[derive(Clone)]
40pub struct EthPubSub<Eth> {
41    /// All nested fields bundled together.
42    inner: Arc<EthPubSubInner<Eth>>,
43}
44
45// === impl EthPubSub ===
46
47impl<Eth> EthPubSub<Eth> {
48    /// Creates a new, shareable instance.
49    pub fn new(eth_api: Eth, subscription_task_spawner: Runtime) -> Self {
50        let inner = EthPubSubInner { eth_api, subscription_task_spawner };
51        Self { inner: Arc::new(inner) }
52    }
53}
54
55impl<Eth> EthPubSub<Eth>
56where
57    Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
58{
59    /// Returns the current sync status for the `syncing` subscription
60    pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
61        self.inner.sync_status(is_syncing)
62    }
63
64    /// Returns a stream that yields all transaction hashes emitted by the txpool.
65    pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
66        self.inner.pending_transaction_hashes_stream()
67    }
68
69    /// Returns a stream that yields all transactions emitted by the txpool.
70    pub fn full_pending_transaction_stream(
71        &self,
72    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
73        self.inner.full_pending_transaction_stream()
74    }
75
76    /// Returns a stream that yields all new RPC blocks.
77    pub fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
78        self.inner.new_headers_stream()
79    }
80
81    /// Returns a stream that yields all logs that match the given filter.
82    pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
83        self.inner.log_stream(filter)
84    }
85
86    /// The actual handler for an accepted [`EthPubSub::subscribe`] call.
87    pub async fn handle_accepted(
88        &self,
89        accepted_sink: SubscriptionSink,
90        kind: SubscriptionKind,
91        params: Option<Params>,
92    ) -> Result<(), ErrorObject<'static>> {
93        #[allow(unreachable_patterns)]
94        match kind {
95            SubscriptionKind::NewHeads => {
96                pipe_from_stream(accepted_sink, self.new_headers_stream()).await
97            }
98            SubscriptionKind::Logs => {
99                // if no params are provided, used default filter params
100                let filter = match params {
101                    Some(Params::Logs(filter)) => *filter,
102                    Some(Params::Bool(_)) => {
103                        return Err(invalid_params_rpc_err("Invalid params for logs"))
104                    }
105                    _ => Default::default(),
106                };
107                pipe_from_stream(accepted_sink, self.log_stream(filter)).await
108            }
109            SubscriptionKind::NewPendingTransactions => {
110                if let Some(params) = params {
111                    match params {
112                        Params::Bool(true) => {
113                            // full transaction objects requested
114                            let stream = self.full_pending_transaction_stream().filter_map(|tx| {
115                                let tx_value = match self
116                                    .inner
117                                    .eth_api
118                                    .converter()
119                                    .fill_pending(tx.transaction.to_consensus())
120                                {
121                                    Ok(tx) => Some(tx),
122                                    Err(err) => {
123                                        error!(target = "rpc",
124                                            %err,
125                                            "Failed to fill transaction with block context"
126                                        );
127                                        None
128                                    }
129                                };
130                                std::future::ready(tx_value)
131                            });
132                            return pipe_from_stream(accepted_sink, stream).await
133                        }
134                        Params::Bool(false) | Params::None => {
135                            // only hashes requested
136                        }
137                        _ => {
138                            return Err(invalid_params_rpc_err(
139                                "Invalid params for newPendingTransactions",
140                            ))
141                        }
142                    }
143                }
144
145                pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
146            }
147            SubscriptionKind::Syncing => {
148                // get new block subscription
149                let mut canon_state = BroadcastStream::new(
150                    self.inner.eth_api.provider().subscribe_to_canonical_state(),
151                );
152                // get current sync status
153                let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
154                let current_sub_res = self.sync_status(initial_sync_status);
155
156                // send the current status immediately
157                let msg = SubscriptionMessage::new(
158                    accepted_sink.method_name(),
159                    accepted_sink.subscription_id(),
160                    &current_sub_res,
161                )
162                .map_err(SubscriptionSerializeError::new)?;
163
164                if accepted_sink.send(msg).await.is_err() {
165                    return Ok(())
166                }
167
168                while canon_state.next().await.is_some() {
169                    let current_syncing = self.inner.eth_api.network().is_syncing();
170                    // Only send a new response if the sync status has changed
171                    if current_syncing != initial_sync_status {
172                        // Update the sync status on each new block
173                        initial_sync_status = current_syncing;
174
175                        // send a new message now that the status changed
176                        let sync_status = self.sync_status(current_syncing);
177                        let msg = SubscriptionMessage::new(
178                            accepted_sink.method_name(),
179                            accepted_sink.subscription_id(),
180                            &sync_status,
181                        )
182                        .map_err(SubscriptionSerializeError::new)?;
183
184                        if accepted_sink.send(msg).await.is_err() {
185                            break
186                        }
187                    }
188                }
189
190                Ok(())
191            }
192            SubscriptionKind::TransactionReceipts => {
193                let filter = match params {
194                    Some(Params::TransactionReceipts(filter)) => filter,
195                    None | Some(Params::None) => TransactionReceiptsParams::default(),
196                    _ => {
197                        return Err(invalid_params_rpc_err("Invalid params for transactionReceipts"))
198                    }
199                };
200
201                let converter = self.inner.eth_api.converter();
202                let stream = self.inner.eth_api.provider().canonical_state_stream().flat_map(
203                    move |new_chain| {
204                        // for each block in the new chain, build RPC receipts
205                        let results: Vec<_> = new_chain
206                            .committed()
207                            .blocks_and_receipts()
208                            .filter_map(|(block, receipts)| {
209                                let block_hash = block.hash();
210                                let block_number = block.number();
211                                let base_fee = block.base_fee_per_gas();
212                                let excess_blob_gas = block.excess_blob_gas();
213                                let timestamp = block.timestamp();
214
215                                let mut gas_used: u64 = 0;
216                                let mut next_log_index: usize = 0;
217
218                                // build ConvertReceiptInput for each tx+receipt pair
219                                // (same logic as eth_getBlockReceipts HTTP endpoint)
220                                let inputs: Vec<_> = block
221                                    .transactions_recovered()
222                                    .zip(receipts.iter())
223                                    .enumerate()
224                                    .filter_map(|(idx, (tx, receipt))| {
225                                        let gas_used_before = gas_used;
226                                        let next_log_index_before = next_log_index;
227                                        let cumulative_gas_used = receipt.cumulative_gas_used();
228
229                                        gas_used = cumulative_gas_used;
230                                        next_log_index += receipt.logs().len();
231
232                                        // apply transaction hash filter if provided
233                                        let matches = match &filter.transaction_hashes {
234                                            Some(hashes) if !hashes.is_empty() => {
235                                                hashes.contains(tx.tx_hash())
236                                            }
237                                            _ => true,
238                                        };
239
240                                        matches.then(|| ConvertReceiptInput {
241                                            tx,
242                                            gas_used: cumulative_gas_used - gas_used_before,
243                                            next_log_index: next_log_index_before,
244                                            meta: TransactionMeta {
245                                                tx_hash: *tx.tx_hash(),
246                                                index: idx as u64,
247                                                block_hash,
248                                                block_number,
249                                                base_fee,
250                                                excess_blob_gas,
251                                                timestamp,
252                                            },
253                                            receipt: receipt.clone(),
254                                        })
255                                    })
256                                    .collect();
257
258                                if inputs.is_empty() {
259                                    return None;
260                                }
261
262                                match converter.convert_receipts(inputs) {
263                                    Ok(rpc_receipts) => Some(rpc_receipts),
264                                    Err(err) => {
265                                        error!(
266                                            target = "rpc",
267                                            %err,
268                                            "Failed to convert receipts"
269                                        );
270                                        None
271                                    }
272                                }
273                            })
274                            .collect();
275
276                        futures::stream::iter(results)
277                    },
278                );
279
280                pipe_from_stream(accepted_sink, stream).await
281            }
282            _ => Err(invalid_params_rpc_err("Unsupported subscription kind")),
283        }
284    }
285}
286
287#[async_trait::async_trait]
288impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
289where
290    Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
291{
292    /// Handler for `eth_subscribe`
293    async fn subscribe(
294        &self,
295        pending: PendingSubscriptionSink,
296        kind: SubscriptionKind,
297        params: Option<Params>,
298    ) -> jsonrpsee::core::SubscriptionResult {
299        let sink = pending.accept().await?;
300        let pubsub = self.clone();
301        self.inner.subscription_task_spawner.spawn_task(async move {
302            let _ = pubsub.handle_accepted(sink, kind, params).await;
303        });
304
305        Ok(())
306    }
307}
308
309/// Helper to convert a serde error into an [`ErrorObject`]
310#[derive(Debug, thiserror::Error)]
311#[error("Failed to serialize subscription item: {0}")]
312pub struct SubscriptionSerializeError(#[from] serde_json::Error);
313
314impl SubscriptionSerializeError {
315    const fn new(err: serde_json::Error) -> Self {
316        Self(err)
317    }
318}
319
320impl From<SubscriptionSerializeError> for ErrorObject<'static> {
321    fn from(value: SubscriptionSerializeError) -> Self {
322        internal_rpc_err(value.to_string())
323    }
324}
325
326/// Pipes all stream items to the subscription sink.
327async fn pipe_from_stream<T, St>(
328    sink: SubscriptionSink,
329    mut stream: St,
330) -> Result<(), ErrorObject<'static>>
331where
332    St: Stream<Item = T> + Unpin,
333    T: Serialize,
334{
335    loop {
336        tokio::select! {
337            _ = sink.closed() => {
338                // connection dropped
339                break Ok(())
340            },
341            maybe_item = stream.next() => {
342                let item = match maybe_item {
343                    Some(item) => item,
344                    None => {
345                        // stream ended
346                        break  Ok(())
347                    },
348                };
349                let msg = SubscriptionMessage::new(
350                    sink.method_name(),
351                    sink.subscription_id(),
352                    &item
353                ).map_err(SubscriptionSerializeError::new)?;
354
355                if sink.send(msg).await.is_err() {
356                    break Ok(());
357                }
358            }
359        }
360    }
361}
362
363impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
364    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365        f.debug_struct("EthPubSub").finish_non_exhaustive()
366    }
367}
368
369/// Container type `EthPubSub`
370#[derive(Clone)]
371struct EthPubSubInner<EthApi> {
372    /// The `eth` API.
373    eth_api: EthApi,
374    /// The type that's used to spawn subscription tasks.
375    subscription_task_spawner: Runtime,
376}
377
378// == impl EthPubSubInner ===
379
380impl<Eth> EthPubSubInner<Eth>
381where
382    Eth: RpcNodeCore<Provider: BlockNumReader>,
383{
384    /// Returns the current sync status for the `syncing` subscription
385    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
386        if is_syncing {
387            let current_block = self
388                .eth_api
389                .provider()
390                .chain_info()
391                .map(|info| info.best_number)
392                .unwrap_or_default();
393            PubSubSyncStatus::Detailed(SyncStatusMetadata {
394                syncing: true,
395                starting_block: 0,
396                current_block,
397                highest_block: Some(current_block),
398            })
399        } else {
400            PubSubSyncStatus::Simple(false)
401        }
402    }
403}
404
405impl<Eth> EthPubSubInner<Eth>
406where
407    Eth: RpcNodeCore<Pool: TransactionPool>,
408{
409    /// Returns a stream that yields all transaction hashes emitted by the txpool.
410    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
411        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
412    }
413
414    /// Returns a stream that yields all transactions emitted by the txpool.
415    fn full_pending_transaction_stream(
416        &self,
417    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
418        self.eth_api.pool().new_pending_pool_transactions_listener()
419    }
420}
421
422impl<Eth> EthPubSubInner<Eth>
423where
424    Eth: EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>> + RpcNodeCore,
425{
426    /// Returns a stream that yields all new RPC blocks.
427    fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
428        let converter = self.eth_api.converter();
429        self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
430            let headers = new_chain
431                .committed()
432                .blocks_iter()
433                .filter_map(|block| {
434                    match converter.convert_header(block.clone_sealed_header(), block.rlp_length())
435                    {
436                        Ok(header) => Some(header),
437                        Err(err) => {
438                            error!(target = "rpc", %err, "Failed to convert header");
439                            None
440                        }
441                    }
442                })
443                .collect::<Vec<_>>();
444            futures::stream::iter(headers)
445        })
446    }
447
448    /// Returns a stream that yields all logs that match the given filter.
449    fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
450        BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
451            .map(move |canon_state| {
452                canon_state.expect("new block subscription never ends").block_receipts()
453            })
454            .flat_map(futures::stream::iter)
455            .flat_map(move |(block_receipts, removed)| {
456                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
457                    &filter,
458                    block_receipts.block,
459                    block_receipts.timestamp,
460                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
461                    removed,
462                );
463                futures::stream::iter(all_logs)
464            })
465    }
466}