reth_node_core/
exit.rs

1//! Helper types for waiting for the node to exit.
2
3use futures::{future::BoxFuture, FutureExt};
4use std::{
5    fmt,
6    future::Future,
7    pin::Pin,
8    task::{ready, Context, Poll},
9};
10
11/// A Future which resolves when the node exits
12pub struct NodeExitFuture {
13    /// The consensus engine future.
14    /// This can be polled to wait for the consensus engine to exit.
15    consensus_engine_fut: Option<BoxFuture<'static, eyre::Result<()>>>,
16
17    /// Flag indicating whether the node should be terminated after the pipeline sync.
18    terminate: bool,
19}
20
21impl fmt::Debug for NodeExitFuture {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("NodeExitFuture")
24            .field("consensus_engine_fut", &"...")
25            .field("terminate", &self.terminate)
26            .finish()
27    }
28}
29
30impl NodeExitFuture {
31    /// Create a new `NodeExitFuture`.
32    pub fn new<F>(consensus_engine_fut: F, terminate: bool) -> Self
33    where
34        F: Future<Output = eyre::Result<()>> + 'static + Send,
35    {
36        Self { consensus_engine_fut: Some(Box::pin(consensus_engine_fut)), terminate }
37    }
38}
39
40impl Future for NodeExitFuture {
41    type Output = eyre::Result<()>;
42
43    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
44        let this = self.get_mut();
45        if let Some(rx) = this.consensus_engine_fut.as_mut() {
46            match ready!(rx.poll_unpin(cx)) {
47                Ok(_) => {
48                    this.consensus_engine_fut.take();
49                    if this.terminate {
50                        Poll::Ready(Ok(()))
51                    } else {
52                        Poll::Pending
53                    }
54                }
55                Err(err) => Poll::Ready(Err(err)),
56            }
57        } else {
58            Poll::Pending
59        }
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66    use std::future::poll_fn;
67
68    #[tokio::test]
69    async fn test_node_exit_future_terminate_true() {
70        let fut = async { Ok(()) };
71
72        let node_exit_future = NodeExitFuture::new(fut, true);
73
74        let res = node_exit_future.await;
75
76        assert!(res.is_ok());
77    }
78
79    #[tokio::test]
80    async fn test_node_exit_future_terminate_false() {
81        let fut = async { Ok(()) };
82
83        let mut node_exit_future = NodeExitFuture::new(fut, false);
84        poll_fn(|cx| {
85            assert!(node_exit_future.poll_unpin(cx).is_pending());
86            Poll::Ready(())
87        })
88        .await;
89    }
90}