reth_rpc/eth/helpers/
sync_listener.rs1use futures::Stream;
4use pin_project::pin_project;
5use reth_network_api::NetworkInfo;
6use std::{
7 future::Future,
8 pin::Pin,
9 task::{ready, Context, Poll},
10};
11
12#[must_use = "futures do nothing unless polled"]
14#[pin_project]
15#[derive(Debug)]
16pub struct SyncListener<N, St> {
17 #[pin]
18 tick: St,
19 network_info: N,
20}
21
22impl<N, St> SyncListener<N, St> {
23 pub const fn new(network_info: N, tick: St) -> Self {
25 Self { tick, network_info }
26 }
27}
28
29impl<N, St, Out> Future for SyncListener<N, St>
30where
31 N: NetworkInfo,
32 St: Stream<Item = Out> + Unpin,
33{
34 type Output = ();
35
36 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37 let mut this = self.project();
38
39 if !this.network_info.is_syncing() {
40 return Poll::Ready(());
41 }
42
43 loop {
44 let tick_event = ready!(this.tick.as_mut().poll_next(cx));
45
46 match tick_event {
47 Some(_) => {
48 if !this.network_info.is_syncing() {
49 return Poll::Ready(());
50 }
51 }
52 None => return Poll::Ready(()),
53 }
54 }
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61 use alloy_rpc_types_admin::EthProtocolInfo;
62 use futures::stream;
63 use reth_network_api::{NetworkError, NetworkStatus};
64 use std::{
65 net::{IpAddr, SocketAddr},
66 sync::{
67 atomic::{AtomicBool, Ordering},
68 Arc,
69 },
70 };
71
72 #[derive(Clone)]
73 struct TestNetwork {
74 syncing: Arc<AtomicBool>,
75 }
76
77 impl NetworkInfo for TestNetwork {
78 fn local_addr(&self) -> SocketAddr {
79 (IpAddr::from([0, 0, 0, 0]), 0).into()
80 }
81
82 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
83 #[allow(deprecated)]
84 Ok(NetworkStatus {
85 client_version: "test".to_string(),
86 protocol_version: 5,
87 eth_protocol_info: EthProtocolInfo {
88 network: 1,
89 difficulty: None,
90 genesis: Default::default(),
91 config: Default::default(),
92 head: Default::default(),
93 },
94 capabilities: vec![],
95 })
96 }
97
98 fn chain_id(&self) -> u64 {
99 1
100 }
101
102 fn is_syncing(&self) -> bool {
103 self.syncing.load(Ordering::SeqCst)
104 }
105
106 fn is_initially_syncing(&self) -> bool {
107 self.is_syncing()
108 }
109 }
110
111 #[tokio::test]
112 async fn completes_immediately_if_not_syncing() {
113 let network = TestNetwork { syncing: Arc::new(AtomicBool::new(false)) };
114 let fut = SyncListener::new(network, stream::pending::<()>());
115 fut.await;
116 }
117
118 #[tokio::test]
119 async fn resolves_when_syncing_stops() {
120 use tokio::sync::mpsc::unbounded_channel;
121 use tokio_stream::wrappers::UnboundedReceiverStream;
122
123 let syncing = Arc::new(AtomicBool::new(true));
124 let network = TestNetwork { syncing: syncing.clone() };
125 let (tx, rx) = unbounded_channel();
126 let listener = SyncListener::new(network, UnboundedReceiverStream::new(rx));
127 let handle = tokio::spawn(listener);
128
129 syncing.store(false, Ordering::Relaxed);
130 let _ = tx.send(());
131
132 handle.await.unwrap();
133 }
134}