reth_node_core/
exit.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//! Helper types for waiting for the node to exit.

use futures::{future::BoxFuture, FutureExt};
use std::{
    fmt,
    future::Future,
    pin::Pin,
    task::{ready, Context, Poll},
};

/// A Future which resolves when the node exits
pub struct NodeExitFuture {
    /// The consensus engine future.
    /// This can be polled to wait for the consensus engine to exit.
    consensus_engine_fut: Option<BoxFuture<'static, eyre::Result<()>>>,

    /// Flag indicating whether the node should be terminated after the pipeline sync.
    terminate: bool,
}

impl fmt::Debug for NodeExitFuture {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("NodeExitFuture")
            .field("consensus_engine_fut", &"...")
            .field("terminate", &self.terminate)
            .finish()
    }
}

impl NodeExitFuture {
    /// Create a new `NodeExitFuture`.
    pub fn new<F>(consensus_engine_fut: F, terminate: bool) -> Self
    where
        F: Future<Output = eyre::Result<()>> + 'static + Send,
    {
        Self { consensus_engine_fut: Some(Box::pin(consensus_engine_fut)), terminate }
    }
}

impl Future for NodeExitFuture {
    type Output = eyre::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        if let Some(rx) = this.consensus_engine_fut.as_mut() {
            match ready!(rx.poll_unpin(cx)) {
                Ok(_) => {
                    this.consensus_engine_fut.take();
                    if this.terminate {
                        Poll::Ready(Ok(()))
                    } else {
                        Poll::Pending
                    }
                }
                Err(err) => Poll::Ready(Err(err)),
            }
        } else {
            Poll::Pending
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::future::poll_fn;

    #[tokio::test]
    async fn test_node_exit_future_terminate_true() {
        let fut = async { Ok(()) };

        let node_exit_future = NodeExitFuture::new(fut, true);

        let res = node_exit_future.await;

        assert!(res.is_ok());
    }

    #[tokio::test]
    async fn test_node_exit_future_terminate_false() {
        let fut = async { Ok(()) };

        let mut node_exit_future = NodeExitFuture::new(fut, false);
        poll_fn(|cx| {
            assert!(node_exit_future.poll_unpin(cx).is_pending());
            Poll::Ready(())
        })
        .await;
    }
}