reth_node_ethstats/
connection.rs

1/// Abstractions for managing `WebSocket` connections in the ethstats service.
2use crate::error::ConnectionError;
3use futures_util::{
4    stream::{SplitSink, SplitStream},
5    SinkExt, StreamExt,
6};
7use serde_json::Value;
8use std::sync::Arc;
9use tokio::{net::TcpStream, sync::Mutex};
10use tokio_tungstenite::{
11    tungstenite::protocol::{frame::Utf8Bytes, Message},
12    MaybeTlsStream, WebSocketStream,
13};
14
15/// Type alias for a `WebSocket` stream that may be TLS or plain TCP
16pub(crate) type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
17
18/// Wrapper for a thread-safe, asynchronously accessible `WebSocket` connection
19#[derive(Debug, Clone)]
20pub(crate) struct ConnWrapper {
21    /// Write-only part of the `WebSocket` stream
22    writer: Arc<Mutex<SplitSink<WsStream, Message>>>,
23    /// Read-only part of the `WebSocket` stream
24    reader: Arc<Mutex<SplitStream<WsStream>>>,
25}
26
27impl ConnWrapper {
28    /// Create a new connection wrapper from a `WebSocket` stream
29    pub(crate) fn new(stream: WsStream) -> Self {
30        let (writer, reader) = stream.split();
31
32        Self { writer: Arc::new(Mutex::new(writer)), reader: Arc::new(Mutex::new(reader)) }
33    }
34
35    /// Write a JSON string as a text message to the `WebSocket`
36    pub(crate) async fn write_json(&self, value: &str) -> Result<(), ConnectionError> {
37        let mut writer = self.writer.lock().await;
38        writer.send(Message::Text(Utf8Bytes::from(value))).await?;
39
40        Ok(())
41    }
42
43    /// Read the next JSON text message from the `WebSocket`
44    ///
45    /// Waits for the next text message, parses it as JSON, and returns the value.
46    /// Ignores non-text messages. Returns an error if the connection is closed or if parsing fails.
47    pub(crate) async fn read_json(&self) -> Result<Value, ConnectionError> {
48        let mut reader = self.reader.lock().await;
49        while let Some(msg) = reader.next().await {
50            match msg? {
51                Message::Text(text) => return Ok(serde_json::from_str(&text)?),
52                Message::Close(_) => return Err(ConnectionError::ConnectionClosed),
53                _ => {} // Ignore non-text messages
54            }
55        }
56
57        Err(ConnectionError::ConnectionClosed)
58    }
59
60    /// Close the `WebSocket` connection gracefully
61    pub(crate) async fn close(&self) -> Result<(), ConnectionError> {
62        let mut writer = self.writer.lock().await;
63        writer.close().await?;
64
65        Ok(())
66    }
67}