reth_rpc/eth/helpers/
sync_listener.rs

1//! A utility Future to asynchronously wait until a node has finished syncing.
2
3use futures::Stream;
4use pin_project::pin_project;
5use reth_network_api::NetworkInfo;
6use std::{
7    future::Future,
8    pin::Pin,
9    task::{ready, Context, Poll},
10};
11
12/// This future resolves once the node is no longer syncing: [`NetworkInfo::is_syncing`].
13#[must_use = "futures do nothing unless polled"]
14#[pin_project]
15#[derive(Debug)]
16pub struct SyncListener<N, St> {
17    #[pin]
18    tick: St,
19    network_info: N,
20}
21
22impl<N, St> SyncListener<N, St> {
23    /// Create a new [`SyncListener`] using the given tick stream.
24    pub const fn new(network_info: N, tick: St) -> Self {
25        Self { tick, network_info }
26    }
27}
28
29impl<N, St, Out> Future for SyncListener<N, St>
30where
31    N: NetworkInfo,
32    St: Stream<Item = Out> + Unpin,
33{
34    type Output = ();
35
36    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37        let mut this = self.project();
38
39        if !this.network_info.is_syncing() {
40            return Poll::Ready(());
41        }
42
43        loop {
44            let tick_event = ready!(this.tick.as_mut().poll_next(cx));
45
46            match tick_event {
47                Some(_) => {
48                    if !this.network_info.is_syncing() {
49                        return Poll::Ready(());
50                    }
51                }
52                None => return Poll::Ready(()),
53            }
54        }
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61    use alloy_rpc_types_admin::EthProtocolInfo;
62    use futures::stream;
63    use reth_network_api::{NetworkError, NetworkStatus};
64    use std::{
65        net::{IpAddr, SocketAddr},
66        sync::{
67            atomic::{AtomicBool, Ordering},
68            Arc,
69        },
70    };
71
72    #[derive(Clone)]
73    struct TestNetwork {
74        syncing: Arc<AtomicBool>,
75    }
76
77    impl NetworkInfo for TestNetwork {
78        fn local_addr(&self) -> SocketAddr {
79            (IpAddr::from([0, 0, 0, 0]), 0).into()
80        }
81
82        async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
83            #[allow(deprecated)]
84            Ok(NetworkStatus {
85                client_version: "test".to_string(),
86                protocol_version: 5,
87                eth_protocol_info: EthProtocolInfo {
88                    network: 1,
89                    difficulty: None,
90                    genesis: Default::default(),
91                    config: Default::default(),
92                    head: Default::default(),
93                },
94                capabilities: vec![],
95            })
96        }
97
98        fn chain_id(&self) -> u64 {
99            1
100        }
101
102        fn is_syncing(&self) -> bool {
103            self.syncing.load(Ordering::SeqCst)
104        }
105
106        fn is_initially_syncing(&self) -> bool {
107            self.is_syncing()
108        }
109    }
110
111    #[tokio::test]
112    async fn completes_immediately_if_not_syncing() {
113        let network = TestNetwork { syncing: Arc::new(AtomicBool::new(false)) };
114        let fut = SyncListener::new(network, stream::pending::<()>());
115        fut.await;
116    }
117
118    #[tokio::test]
119    async fn resolves_when_syncing_stops() {
120        use tokio::sync::mpsc::unbounded_channel;
121        use tokio_stream::wrappers::UnboundedReceiverStream;
122
123        let syncing = Arc::new(AtomicBool::new(true));
124        let network = TestNetwork { syncing: syncing.clone() };
125        let (tx, rx) = unbounded_channel();
126        let listener = SyncListener::new(network, UnboundedReceiverStream::new(rx));
127        let handle = tokio::spawn(listener);
128
129        syncing.store(false, Ordering::Relaxed);
130        let _ = tx.send(());
131
132        handle.await.unwrap();
133    }
134}