reth_ipc/server/
connection.rs
1use crate::stream_codec::StreamCodec;
4use futures::{stream::FuturesUnordered, FutureExt, Sink, Stream};
5use std::{
6 collections::VecDeque,
7 future::Future,
8 io,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tokio_util::codec::Framed;
14use tower::Service;
15
16pub(crate) type JsonRpcStream<T> = Framed<T, StreamCodec>;
17
18#[pin_project::pin_project]
19pub(crate) struct IpcConn<T>(#[pin] pub(crate) T);
20
21impl<T> Stream for IpcConn<JsonRpcStream<T>>
22where
23 T: AsyncRead + AsyncWrite,
24{
25 type Item = io::Result<String>;
26
27 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28 self.project().0.poll_next(cx)
29 }
30}
31
32impl<T> Sink<String> for IpcConn<JsonRpcStream<T>>
33where
34 T: AsyncRead + AsyncWrite,
35{
36 type Error = io::Error;
37
38 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
39 self.project().0.poll_flush(cx)
42 }
43
44 fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
45 self.project().0.start_send(item)
46 }
47
48 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49 self.project().0.poll_flush(cx)
50 }
51
52 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53 self.project().0.poll_close(cx)
54 }
55}
56
57#[pin_project::pin_project]
64#[must_use = "futures do nothing unless you `.await` or poll them"]
65pub(crate) struct IpcConnDriver<T, S, Fut> {
66 #[pin]
67 pub(crate) conn: IpcConn<JsonRpcStream<T>>,
68 pub(crate) service: S,
69 #[pin]
71 pub(crate) pending_calls: FuturesUnordered<Fut>,
72 pub(crate) items: VecDeque<String>,
73}
74
75impl<T, S, Fut> IpcConnDriver<T, S, Fut> {
76 pub(crate) fn push_back(&mut self, item: String) {
78 self.items.push_back(item);
79 }
80}
81
82impl<T, S> Future for IpcConnDriver<T, S, S::Future>
83where
84 S: Service<String, Response = Option<String>> + Send + 'static,
85 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
86 S::Future: Send + Unpin,
87 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
88{
89 type Output = ();
90
91 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 let mut this = self.project();
93
94 let mut budget = 5;
98
99 'outer: loop {
101 budget -= 1;
102 if budget == 0 {
103 cx.waker().wake_by_ref();
105 return Poll::Pending
106 }
107
108 while this.conn.as_mut().poll_ready(cx).is_ready() {
110 if let Some(item) = this.items.pop_front() {
111 if let Err(err) = this.conn.as_mut().start_send(item) {
112 tracing::warn!("IPC response failed: {:?}", err);
113 return Poll::Ready(())
114 }
115 } else {
116 break
117 }
118 }
119
120 'inner: loop {
121 let drained = if this.pending_calls.is_empty() {
123 false
124 } else {
125 if let Poll::Ready(Some(res)) = this.pending_calls.as_mut().poll_next(cx) {
126 let item = match res {
127 Ok(Some(resp)) => resp,
128 Ok(None) => continue 'inner,
129 Err(err) => err.into().to_string(),
130 };
131 this.items.push_back(item);
132 continue 'outer;
133 }
134 true
135 };
136
137 match this.conn.as_mut().poll_next(cx) {
139 Poll::Ready(res) => match res {
140 Some(Ok(item)) => {
141 let mut call = this.service.call(item);
142 match call.poll_unpin(cx) {
143 Poll::Ready(res) => {
144 let item = match res {
145 Ok(Some(resp)) => resp,
146 Ok(None) => continue 'inner,
147 Err(err) => err.into().to_string(),
148 };
149 this.items.push_back(item);
150 continue 'outer
151 }
152 Poll::Pending => {
153 this.pending_calls.push(call);
154 }
155 }
156 }
157 Some(Err(err)) => {
158 tracing::debug!("IPC request failed: {:?}", err);
160 return Poll::Ready(())
161 }
162 None => return Poll::Ready(()),
163 },
164 Poll::Pending => {
165 if drained || this.pending_calls.is_empty() {
166 return Poll::Pending
168 }
169 }
170 }
171 }
172 }
173 }
174}