reth_ipc/server/
connection.rs

1//! A IPC connection.
2
3use crate::stream_codec::StreamCodec;
4use futures::{stream::FuturesUnordered, FutureExt, Sink, Stream};
5use std::{
6    collections::VecDeque,
7    future::Future,
8    io,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tokio_util::codec::Framed;
14use tower::Service;
15
16pub(crate) type JsonRpcStream<T> = Framed<T, StreamCodec>;
17
18#[pin_project::pin_project]
19pub(crate) struct IpcConn<T>(#[pin] pub(crate) T);
20
21impl<T> Stream for IpcConn<JsonRpcStream<T>>
22where
23    T: AsyncRead + AsyncWrite,
24{
25    type Item = io::Result<String>;
26
27    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28        self.project().0.poll_next(cx)
29    }
30}
31
32impl<T> Sink<String> for IpcConn<JsonRpcStream<T>>
33where
34    T: AsyncRead + AsyncWrite,
35{
36    type Error = io::Error;
37
38    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
39        // NOTE: we always flush here this prevents buffering in the underlying
40        // `Framed` impl that would cause stalled requests
41        self.project().0.poll_flush(cx)
42    }
43
44    fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
45        self.project().0.start_send(item)
46    }
47
48    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        self.project().0.poll_flush(cx)
50    }
51
52    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53        self.project().0.poll_close(cx)
54    }
55}
56
57/// Drives an [IpcConn] forward.
58///
59/// This forwards received requests from the connection to the service and sends responses to the
60/// connection.
61///
62/// This future terminates when the connection is closed.
63#[pin_project::pin_project]
64#[must_use = "futures do nothing unless you `.await` or poll them"]
65pub(crate) struct IpcConnDriver<T, S, Fut> {
66    #[pin]
67    pub(crate) conn: IpcConn<JsonRpcStream<T>>,
68    pub(crate) service: S,
69    /// rpc requests in progress
70    #[pin]
71    pub(crate) pending_calls: FuturesUnordered<Fut>,
72    pub(crate) items: VecDeque<String>,
73}
74
75impl<T, S, Fut> IpcConnDriver<T, S, Fut> {
76    /// Add a new item to the send queue.
77    pub(crate) fn push_back(&mut self, item: String) {
78        self.items.push_back(item);
79    }
80}
81
82impl<T, S> Future for IpcConnDriver<T, S, S::Future>
83where
84    S: Service<String, Response = Option<String>> + Send + 'static,
85    S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
86    S::Future: Send + Unpin,
87    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
88{
89    type Output = ();
90
91    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        let mut this = self.project();
93
94        // items are also pushed from external
95        // this will act as a manual yield point to reduce latencies of the polling future that may
96        // submit items from an additional source (subscription)
97        let mut budget = 5;
98
99        // ensure we still have enough budget for another iteration
100        'outer: loop {
101            budget -= 1;
102            if budget == 0 {
103                // make sure we're woken up again
104                cx.waker().wake_by_ref();
105                return Poll::Pending
106            }
107
108            // write all responses to the sink
109            while this.conn.as_mut().poll_ready(cx).is_ready() {
110                if let Some(item) = this.items.pop_front() {
111                    if let Err(err) = this.conn.as_mut().start_send(item) {
112                        tracing::warn!("IPC response failed: {:?}", err);
113                        return Poll::Ready(())
114                    }
115                } else {
116                    break
117                }
118            }
119
120            'inner: loop {
121                // drain all calls that are ready and put them in the output item queue
122                let drained = if this.pending_calls.is_empty() {
123                    false
124                } else {
125                    if let Poll::Ready(Some(res)) = this.pending_calls.as_mut().poll_next(cx) {
126                        let item = match res {
127                            Ok(Some(resp)) => resp,
128                            Ok(None) => continue 'inner,
129                            Err(err) => err.into().to_string(),
130                        };
131                        this.items.push_back(item);
132                        continue 'outer;
133                    }
134                    true
135                };
136
137                // read from the stream
138                match this.conn.as_mut().poll_next(cx) {
139                    Poll::Ready(res) => match res {
140                        Some(Ok(item)) => {
141                            let mut call = this.service.call(item);
142                            match call.poll_unpin(cx) {
143                                Poll::Ready(res) => {
144                                    let item = match res {
145                                        Ok(Some(resp)) => resp,
146                                        Ok(None) => continue 'inner,
147                                        Err(err) => err.into().to_string(),
148                                    };
149                                    this.items.push_back(item);
150                                    continue 'outer
151                                }
152                                Poll::Pending => {
153                                    this.pending_calls.push(call);
154                                }
155                            }
156                        }
157                        Some(Err(err)) => {
158                            // this can happen if the client closes the connection
159                            tracing::debug!("IPC request failed: {:?}", err);
160                            return Poll::Ready(())
161                        }
162                        None => return Poll::Ready(()),
163                    },
164                    Poll::Pending => {
165                        if drained || this.pending_calls.is_empty() {
166                            // at this point all things are pending
167                            return Poll::Pending
168                        }
169                    }
170                }
171            }
172        }
173    }
174}