reth_node_ethstats/
connection.rs1use crate::error::ConnectionError;
3use futures_util::{
4 stream::{SplitSink, SplitStream},
5 SinkExt, StreamExt,
6};
7use serde_json::Value;
8use std::sync::Arc;
9use tokio::{net::TcpStream, sync::Mutex};
10use tokio_tungstenite::{
11 tungstenite::protocol::{frame::Utf8Bytes, Message},
12 MaybeTlsStream, WebSocketStream,
13};
14
15pub(crate) type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
17
18#[derive(Debug, Clone)]
20pub(crate) struct ConnWrapper {
21 writer: Arc<Mutex<SplitSink<WsStream, Message>>>,
23 reader: Arc<Mutex<SplitStream<WsStream>>>,
25}
26
27impl ConnWrapper {
28 pub(crate) fn new(stream: WsStream) -> Self {
30 let (writer, reader) = stream.split();
31
32 Self { writer: Arc::new(Mutex::new(writer)), reader: Arc::new(Mutex::new(reader)) }
33 }
34
35 pub(crate) async fn write_json(&self, value: &str) -> Result<(), ConnectionError> {
37 let mut writer = self.writer.lock().await;
38 writer.send(Message::Text(Utf8Bytes::from(value))).await?;
39
40 Ok(())
41 }
42
43 pub(crate) async fn read_json(&self) -> Result<Value, ConnectionError> {
48 let mut reader = self.reader.lock().await;
49 while let Some(msg) = reader.next().await {
50 match msg? {
51 Message::Text(text) => return Ok(serde_json::from_str(&text)?),
52 Message::Close(_) => return Err(ConnectionError::ConnectionClosed),
53 _ => {} }
55 }
56
57 Err(ConnectionError::ConnectionClosed)
58 }
59
60 pub(crate) async fn close(&self) -> Result<(), ConnectionError> {
62 let mut writer = self.writer.lock().await;
63 writer.close().await?;
64
65 Ok(())
66 }
67}