1use std::sync::Arc;
4
5use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
6use alloy_primitives::TxHash;
7use alloy_rpc_types_eth::{
8 pubsub::{
9 Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams,
10 },
11 Filter, Log,
12};
13use futures::StreamExt;
14use jsonrpsee::{
15 server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
16};
17use reth_chain_state::CanonStateSubscriptions;
18use reth_network_api::NetworkInfo;
19use reth_primitives_traits::TransactionMeta;
20use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcHeader};
21use reth_rpc_eth_api::{
22 pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
23};
24use reth_rpc_eth_types::logs_utils;
25use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
26use reth_storage_api::BlockNumReader;
27use reth_tasks::Runtime;
28use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
29use serde::Serialize;
30use tokio_stream::{
31 wrappers::{BroadcastStream, ReceiverStream},
32 Stream,
33};
34use tracing::error;
35
36#[derive(Clone)]
40pub struct EthPubSub<Eth> {
41 inner: Arc<EthPubSubInner<Eth>>,
43}
44
45impl<Eth> EthPubSub<Eth> {
48 pub fn new(eth_api: Eth, subscription_task_spawner: Runtime) -> Self {
50 let inner = EthPubSubInner { eth_api, subscription_task_spawner };
51 Self { inner: Arc::new(inner) }
52 }
53}
54
55impl<Eth> EthPubSub<Eth>
56where
57 Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
58{
59 pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
61 self.inner.sync_status(is_syncing)
62 }
63
64 pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
66 self.inner.pending_transaction_hashes_stream()
67 }
68
69 pub fn full_pending_transaction_stream(
71 &self,
72 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
73 self.inner.full_pending_transaction_stream()
74 }
75
76 pub fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
78 self.inner.new_headers_stream()
79 }
80
81 pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
83 self.inner.log_stream(filter)
84 }
85
86 pub async fn handle_accepted(
88 &self,
89 accepted_sink: SubscriptionSink,
90 kind: SubscriptionKind,
91 params: Option<Params>,
92 ) -> Result<(), ErrorObject<'static>> {
93 #[allow(unreachable_patterns)]
94 match kind {
95 SubscriptionKind::NewHeads => {
96 pipe_from_stream(accepted_sink, self.new_headers_stream()).await
97 }
98 SubscriptionKind::Logs => {
99 let filter = match params {
101 Some(Params::Logs(filter)) => *filter,
102 Some(Params::Bool(_)) => {
103 return Err(invalid_params_rpc_err("Invalid params for logs"))
104 }
105 _ => Default::default(),
106 };
107 pipe_from_stream(accepted_sink, self.log_stream(filter)).await
108 }
109 SubscriptionKind::NewPendingTransactions => {
110 if let Some(params) = params {
111 match params {
112 Params::Bool(true) => {
113 let stream = self.full_pending_transaction_stream().filter_map(|tx| {
115 let tx_value = match self
116 .inner
117 .eth_api
118 .converter()
119 .fill_pending(tx.transaction.to_consensus())
120 {
121 Ok(tx) => Some(tx),
122 Err(err) => {
123 error!(target = "rpc",
124 %err,
125 "Failed to fill transaction with block context"
126 );
127 None
128 }
129 };
130 std::future::ready(tx_value)
131 });
132 return pipe_from_stream(accepted_sink, stream).await
133 }
134 Params::Bool(false) | Params::None => {
135 }
137 _ => {
138 return Err(invalid_params_rpc_err(
139 "Invalid params for newPendingTransactions",
140 ))
141 }
142 }
143 }
144
145 pipe_from_stream(accepted_sink, self.pending_transaction_hashes_stream()).await
146 }
147 SubscriptionKind::Syncing => {
148 let mut canon_state = BroadcastStream::new(
150 self.inner.eth_api.provider().subscribe_to_canonical_state(),
151 );
152 let mut initial_sync_status = self.inner.eth_api.network().is_syncing();
154 let current_sub_res = self.sync_status(initial_sync_status);
155
156 let msg = SubscriptionMessage::new(
158 accepted_sink.method_name(),
159 accepted_sink.subscription_id(),
160 ¤t_sub_res,
161 )
162 .map_err(SubscriptionSerializeError::new)?;
163
164 if accepted_sink.send(msg).await.is_err() {
165 return Ok(())
166 }
167
168 while canon_state.next().await.is_some() {
169 let current_syncing = self.inner.eth_api.network().is_syncing();
170 if current_syncing != initial_sync_status {
172 initial_sync_status = current_syncing;
174
175 let sync_status = self.sync_status(current_syncing);
177 let msg = SubscriptionMessage::new(
178 accepted_sink.method_name(),
179 accepted_sink.subscription_id(),
180 &sync_status,
181 )
182 .map_err(SubscriptionSerializeError::new)?;
183
184 if accepted_sink.send(msg).await.is_err() {
185 break
186 }
187 }
188 }
189
190 Ok(())
191 }
192 SubscriptionKind::TransactionReceipts => {
193 let filter = match params {
194 Some(Params::TransactionReceipts(filter)) => filter,
195 None | Some(Params::None) => TransactionReceiptsParams::default(),
196 _ => {
197 return Err(invalid_params_rpc_err("Invalid params for transactionReceipts"))
198 }
199 };
200
201 let converter = self.inner.eth_api.converter();
202 let stream = self.inner.eth_api.provider().canonical_state_stream().flat_map(
203 move |new_chain| {
204 let results: Vec<_> = new_chain
206 .committed()
207 .blocks_and_receipts()
208 .filter_map(|(block, receipts)| {
209 let block_hash = block.hash();
210 let block_number = block.number();
211 let base_fee = block.base_fee_per_gas();
212 let excess_blob_gas = block.excess_blob_gas();
213 let timestamp = block.timestamp();
214
215 let mut gas_used: u64 = 0;
216 let mut next_log_index: usize = 0;
217
218 let inputs: Vec<_> = block
221 .transactions_recovered()
222 .zip(receipts.iter())
223 .enumerate()
224 .filter_map(|(idx, (tx, receipt))| {
225 let gas_used_before = gas_used;
226 let next_log_index_before = next_log_index;
227 let cumulative_gas_used = receipt.cumulative_gas_used();
228
229 gas_used = cumulative_gas_used;
230 next_log_index += receipt.logs().len();
231
232 let matches = match &filter.transaction_hashes {
234 Some(hashes) if !hashes.is_empty() => {
235 hashes.contains(tx.tx_hash())
236 }
237 _ => true,
238 };
239
240 matches.then(|| ConvertReceiptInput {
241 tx,
242 gas_used: cumulative_gas_used - gas_used_before,
243 next_log_index: next_log_index_before,
244 meta: TransactionMeta {
245 tx_hash: *tx.tx_hash(),
246 index: idx as u64,
247 block_hash,
248 block_number,
249 base_fee,
250 excess_blob_gas,
251 timestamp,
252 },
253 receipt: receipt.clone(),
254 })
255 })
256 .collect();
257
258 if inputs.is_empty() {
259 return None;
260 }
261
262 match converter.convert_receipts(inputs) {
263 Ok(rpc_receipts) => Some(rpc_receipts),
264 Err(err) => {
265 error!(
266 target = "rpc",
267 %err,
268 "Failed to convert receipts"
269 );
270 None
271 }
272 }
273 })
274 .collect();
275
276 futures::stream::iter(results)
277 },
278 );
279
280 pipe_from_stream(accepted_sink, stream).await
281 }
282 _ => Err(invalid_params_rpc_err("Unsupported subscription kind")),
283 }
284 }
285}
286
287#[async_trait::async_trait]
288impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
289where
290 Eth: RpcNodeCore + EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>>,
291{
292 async fn subscribe(
294 &self,
295 pending: PendingSubscriptionSink,
296 kind: SubscriptionKind,
297 params: Option<Params>,
298 ) -> jsonrpsee::core::SubscriptionResult {
299 let sink = pending.accept().await?;
300 let pubsub = self.clone();
301 self.inner.subscription_task_spawner.spawn_task(async move {
302 let _ = pubsub.handle_accepted(sink, kind, params).await;
303 });
304
305 Ok(())
306 }
307}
308
309#[derive(Debug, thiserror::Error)]
311#[error("Failed to serialize subscription item: {0}")]
312pub struct SubscriptionSerializeError(#[from] serde_json::Error);
313
314impl SubscriptionSerializeError {
315 const fn new(err: serde_json::Error) -> Self {
316 Self(err)
317 }
318}
319
320impl From<SubscriptionSerializeError> for ErrorObject<'static> {
321 fn from(value: SubscriptionSerializeError) -> Self {
322 internal_rpc_err(value.to_string())
323 }
324}
325
326async fn pipe_from_stream<T, St>(
328 sink: SubscriptionSink,
329 mut stream: St,
330) -> Result<(), ErrorObject<'static>>
331where
332 St: Stream<Item = T> + Unpin,
333 T: Serialize,
334{
335 loop {
336 tokio::select! {
337 _ = sink.closed() => {
338 break Ok(())
340 },
341 maybe_item = stream.next() => {
342 let item = match maybe_item {
343 Some(item) => item,
344 None => {
345 break Ok(())
347 },
348 };
349 let msg = SubscriptionMessage::new(
350 sink.method_name(),
351 sink.subscription_id(),
352 &item
353 ).map_err(SubscriptionSerializeError::new)?;
354
355 if sink.send(msg).await.is_err() {
356 break Ok(());
357 }
358 }
359 }
360 }
361}
362
363impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
364 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365 f.debug_struct("EthPubSub").finish_non_exhaustive()
366 }
367}
368
369#[derive(Clone)]
371struct EthPubSubInner<EthApi> {
372 eth_api: EthApi,
374 subscription_task_spawner: Runtime,
376}
377
378impl<Eth> EthPubSubInner<Eth>
381where
382 Eth: RpcNodeCore<Provider: BlockNumReader>,
383{
384 fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
386 if is_syncing {
387 let current_block = self
388 .eth_api
389 .provider()
390 .chain_info()
391 .map(|info| info.best_number)
392 .unwrap_or_default();
393 PubSubSyncStatus::Detailed(SyncStatusMetadata {
394 syncing: true,
395 starting_block: 0,
396 current_block,
397 highest_block: Some(current_block),
398 })
399 } else {
400 PubSubSyncStatus::Simple(false)
401 }
402 }
403}
404
405impl<Eth> EthPubSubInner<Eth>
406where
407 Eth: RpcNodeCore<Pool: TransactionPool>,
408{
409 fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
411 ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
412 }
413
414 fn full_pending_transaction_stream(
416 &self,
417 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
418 self.eth_api.pool().new_pending_pool_transactions_listener()
419 }
420}
421
422impl<Eth> EthPubSubInner<Eth>
423where
424 Eth: EthApiTypes<RpcConvert: RpcConvert<Primitives = Eth::Primitives>> + RpcNodeCore,
425{
426 fn new_headers_stream(&self) -> impl Stream<Item = RpcHeader<Eth::NetworkTypes>> {
428 let converter = self.eth_api.converter();
429 self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
430 let headers = new_chain
431 .committed()
432 .blocks_iter()
433 .filter_map(|block| {
434 match converter.convert_header(block.clone_sealed_header(), block.rlp_length())
435 {
436 Ok(header) => Some(header),
437 Err(err) => {
438 error!(target = "rpc", %err, "Failed to convert header");
439 None
440 }
441 }
442 })
443 .collect::<Vec<_>>();
444 futures::stream::iter(headers)
445 })
446 }
447
448 fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
450 BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
451 .map(move |canon_state| {
452 canon_state.expect("new block subscription never ends").block_receipts()
453 })
454 .flat_map(futures::stream::iter)
455 .flat_map(move |(block_receipts, removed)| {
456 let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
457 &filter,
458 block_receipts.block,
459 block_receipts.timestamp,
460 block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
461 removed,
462 );
463 futures::stream::iter(all_logs)
464 })
465 }
466}