1use crate::{
2 connection::ConnWrapper,
3 credentials::EthstatsCredentials,
4 error::EthStatsError,
5 events::{
6 AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PayloadMsg,
7 PayloadStats, PendingMsg, PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
8 },
9};
10use alloy_consensus::{BlockHeader, Sealable};
11use alloy_primitives::U256;
12use reth_chain_state::{CanonStateNotification, CanonStateSubscriptions};
13use reth_network_api::{NetworkInfo, Peers};
14use reth_primitives_traits::{Block, BlockBody};
15use reth_storage_api::{BlockReader, BlockReaderIdExt, NodePrimitivesProvider};
16use reth_transaction_pool::TransactionPool;
17
18use chrono::Local;
19use serde_json::Value;
20use std::{
21 str::FromStr,
22 sync::Arc,
23 time::{Duration, Instant},
24};
25use tokio::{
26 sync::{mpsc, Mutex, RwLock},
27 time::{interval, sleep, timeout},
28};
29use tokio_stream::StreamExt;
30use tokio_tungstenite::connect_async;
31use tracing::{debug, info};
32use url::Url;
33
34const HISTORY_UPDATE_RANGE: u64 = 50;
36const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
38const PING_TIMEOUT: Duration = Duration::from_secs(5);
40const REPORT_INTERVAL: Duration = Duration::from_secs(15);
42const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
44const READ_TIMEOUT: Duration = Duration::from_secs(30);
46
47#[derive(Debug, Clone)]
54pub struct EthStatsService<Network, Provider, Pool> {
55 credentials: EthstatsCredentials,
57 conn: Arc<RwLock<Option<ConnWrapper>>>,
59 last_ping: Arc<Mutex<Option<Instant>>>,
61 network: Network,
63 provider: Provider,
65 pool: Pool,
67}
68
69impl<Network, Provider, Pool> EthStatsService<Network, Provider, Pool>
70where
71 Network: NetworkInfo + Peers,
72 Provider: BlockReaderIdExt + CanonStateSubscriptions,
73 Pool: TransactionPool,
74{
75 pub async fn new(
83 url: &str,
84 network: Network,
85 provider: Provider,
86 pool: Pool,
87 ) -> Result<Self, EthStatsError> {
88 let credentials = EthstatsCredentials::from_str(url)?;
89 let service = Self {
90 credentials,
91 conn: Arc::new(RwLock::new(None)),
92 last_ping: Arc::new(Mutex::new(None)),
93 network,
94 provider,
95 pool,
96 };
97 service.connect().await?;
98
99 Ok(service)
100 }
101
102 async fn connect(&self) -> Result<(), EthStatsError> {
107 debug!(
108 target: "ethstats",
109 "Attempting to connect to EthStats server at {}", self.credentials.host
110 );
111 let full_url = format!("ws://{}/api", self.credentials.host);
112 let url = Url::parse(&full_url).map_err(EthStatsError::Url)?;
113
114 match timeout(CONNECT_TIMEOUT, connect_async(url.as_str())).await {
115 Ok(Ok((ws_stream, _))) => {
116 debug!(
117 target: "ethstats",
118 "Successfully connected to EthStats server at {}", self.credentials.host
119 );
120 let conn: ConnWrapper = ConnWrapper::new(ws_stream);
121 *self.conn.write().await = Some(conn);
122 self.login().await?;
123 Ok(())
124 }
125 Ok(Err(e)) => Err(EthStatsError::WebSocket(e)),
126 Err(_) => {
127 debug!(target: "ethstats", "Connection to EthStats server timed out");
128 Err(EthStatsError::Timeout)
129 }
130 }
131 }
132
133 async fn login(&self) -> Result<(), EthStatsError> {
138 debug!(
139 target: "ethstats",
140 "Attempting to login to EthStats server as node_id {}", self.credentials.node_id
141 );
142 let conn = self.conn.read().await;
143 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
144
145 let network_status = self
146 .network
147 .network_status()
148 .await
149 .map_err(|e| EthStatsError::AuthError(e.to_string()))?;
150 let id = &self.credentials.node_id;
151 let secret = &self.credentials.secret;
152 let protocol = network_status
153 .capabilities
154 .iter()
155 .map(|cap| format!("{}/{}", cap.name, cap.version))
156 .collect::<Vec<_>>()
157 .join(", ");
158 let port = self.network.local_addr().port() as u64;
159
160 let auth = AuthMsg {
161 id: id.clone(),
162 secret: secret.clone(),
163 info: NodeInfo {
164 name: id.clone(),
165 node: network_status.client_version.clone(),
166 port,
167 network: self.network.chain_id().to_string(),
168 protocol,
169 api: "No".to_string(),
170 os: std::env::consts::OS.into(),
171 os_ver: std::env::consts::ARCH.into(),
172 client: "0.1.1".to_string(),
173 history: true,
174 },
175 };
176
177 let message = auth.generate_login_message();
178 conn.write_json(&message).await?;
179
180 let response =
181 timeout(READ_TIMEOUT, conn.read_json()).await.map_err(|_| EthStatsError::Timeout)??;
182
183 if let Some(ack) = response.get("emit") &&
184 ack.get(0) == Some(&Value::String("ready".to_string()))
185 {
186 info!(
187 target: "ethstats",
188 "Login successful to EthStats server as node_id {}", self.credentials.node_id
189 );
190 return Ok(());
191 }
192
193 debug!(target: "ethstats", "Login failed: Unauthorized or unexpected login response");
194 Err(EthStatsError::AuthError("Unauthorized or unexpected login response".into()))
195 }
196
197 async fn report_stats(&self) -> Result<(), EthStatsError> {
202 let conn = self.conn.read().await;
203 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
204
205 let stats_msg = StatsMsg {
206 id: self.credentials.node_id.clone(),
207 stats: NodeStats {
208 active: true,
209 syncing: self.network.is_syncing(),
210 peers: self.network.num_connected_peers() as u64,
211 gas_price: self.pool.block_info().pending_basefee,
212 uptime: 100,
213 },
214 };
215
216 let message = stats_msg.generate_stats_message();
217 conn.write_json(&message).await?;
218
219 Ok(())
220 }
221
222 async fn send_ping(&self) -> Result<(), EthStatsError> {
227 let conn = self.conn.read().await;
228 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
229
230 let ping_time = Instant::now();
231 *self.last_ping.lock().await = Some(ping_time);
232
233 let client_time = Local::now().format("%Y-%m-%d %H:%M:%S%.f %:z %Z").to_string();
234 let ping_msg = PingMsg { id: self.credentials.node_id.clone(), client_time };
235
236 let message = ping_msg.generate_ping_message();
237 conn.write_json(&message).await?;
238
239 let active_ping = self.last_ping.clone();
241 let conn_ref = self.conn.clone();
242 tokio::spawn(async move {
243 sleep(PING_TIMEOUT).await;
244 let mut active = active_ping.lock().await;
245 if active.is_some() {
246 debug!(target: "ethstats", "Ping timeout");
247 *active = None;
248 if let Some(conn) = conn_ref.write().await.take() {
250 let _ = conn.close().await;
251 }
252 }
253 });
254
255 Ok(())
256 }
257
258 async fn report_latency(&self) -> Result<(), EthStatsError> {
263 let conn = self.conn.read().await;
264 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
265
266 let mut active = self.last_ping.lock().await;
267 if let Some(start) = active.take() {
268 let latency = start.elapsed().as_millis() as u64 / 2;
269
270 debug!(target: "ethstats", "Reporting latency: {}ms", latency);
271
272 let latency_msg = LatencyMsg { id: self.credentials.node_id.clone(), latency };
273
274 let message = latency_msg.generate_latency_message();
275 conn.write_json(&message).await?
276 }
277
278 Ok(())
279 }
280
281 async fn report_pending(&self) -> Result<(), EthStatsError> {
286 let conn = self.conn.read().await;
287 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
288 let pending = self.pool.pool_size().pending as u64;
289
290 debug!(target: "ethstats", "Reporting pending txs: {}", pending);
291
292 let pending_msg =
293 PendingMsg { id: self.credentials.node_id.clone(), stats: PendingStats { pending } };
294
295 let message = pending_msg.generate_pending_message();
296 conn.write_json(&message).await?;
297
298 Ok(())
299 }
300
301 async fn report_block(
310 &self,
311 head: Option<CanonStateNotification<<Provider as NodePrimitivesProvider>::Primitives>>,
312 ) -> Result<(), EthStatsError> {
313 let conn = self.conn.read().await;
314 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
315
316 let block_number = if let Some(head) = head {
317 head.tip().header().number()
318 } else {
319 self.provider
320 .best_block_number()
321 .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?
322 };
323
324 match self.provider.block_by_id(block_number.into()) {
325 Ok(Some(block)) => {
326 let block_msg = BlockMsg {
327 id: self.credentials.node_id.clone(),
328 block: self.block_to_stats(&block)?,
329 };
330
331 debug!(target: "ethstats", "Reporting block: {}", block_number);
332
333 let message = block_msg.generate_block_message();
334 conn.write_json(&message).await?;
335 }
336 Ok(None) => {
337 debug!(target: "ethstats", "Block {} not found", block_number);
339 return Err(EthStatsError::BlockNotFound(block_number));
340 }
341 Err(e) => {
342 debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
343 return Err(EthStatsError::DataFetchError(e.to_string()));
344 }
345 };
346
347 Ok(())
348 }
349
350 pub async fn report_new_payload(
355 &self,
356 block_hash: alloy_primitives::B256,
357 block_number: u64,
358 processing_time: Duration,
359 ) -> Result<(), EthStatsError> {
360 let conn = self.conn.read().await;
361 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
362
363 let payload_stats = PayloadStats {
364 number: U256::from(block_number),
365 hash: block_hash,
366 processing_time: processing_time.as_millis() as u64,
367 };
368
369 let payload_msg =
370 PayloadMsg { id: self.credentials.node_id.clone(), payload: payload_stats };
371
372 debug!(
373 target: "ethstats",
374 "Reporting new payload: block={}, hash={:?}, processing_time={}ms",
375 block_number,
376 block_hash,
377 processing_time.as_millis()
378 );
379
380 let message = payload_msg.generate_new_payload_message();
381 conn.write_json(&message).await?;
382
383 Ok(())
384 }
385
386 fn block_to_stats(
394 &self,
395 block: &<Provider as BlockReader>::Block,
396 ) -> Result<BlockStats, EthStatsError> {
397 let body = block.body();
398 let header = block.header();
399
400 let txs = body.transaction_hashes_iter().copied().map(|hash| TxStats { hash }).collect();
401
402 Ok(BlockStats {
403 number: U256::from(header.number()),
404 hash: header.hash_slow(),
405 parent_hash: header.parent_hash(),
406 timestamp: U256::from(header.timestamp()),
407 miner: header.beneficiary(),
408 gas_used: header.gas_used(),
409 gas_limit: header.gas_limit(),
410 diff: header.difficulty().to_string(),
411 total_diff: "0".into(),
412 txs,
413 tx_root: header.transactions_root(),
414 root: header.state_root(),
415 uncles: UncleStats(vec![]),
416 })
417 }
418
419 async fn report_history(&self, list: Option<&Vec<u64>>) -> Result<(), EthStatsError> {
428 let conn = self.conn.read().await;
429 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
430
431 let indexes = if let Some(list) = list {
432 list
433 } else {
434 let best_block_number = self
435 .provider
436 .best_block_number()
437 .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?;
438
439 let start = best_block_number.saturating_sub(HISTORY_UPDATE_RANGE);
440
441 &(start..=best_block_number).collect()
442 };
443
444 let mut blocks = Vec::with_capacity(indexes.len());
445 for &block_number in indexes {
446 match self.provider.block_by_id(block_number.into()) {
447 Ok(Some(block)) => {
448 blocks.push(block);
449 }
450 Ok(None) => {
451 debug!(target: "ethstats", "Block {} not found", block_number);
453 break;
454 }
455 Err(e) => {
456 debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
457 break;
458 }
459 }
460 }
461
462 let history: Vec<BlockStats> =
463 blocks.iter().map(|block| self.block_to_stats(block)).collect::<Result<_, _>>()?;
464
465 if history.is_empty() {
466 debug!(target: "ethstats", "No history to send to stats server");
467 } else {
468 debug!(
469 target: "ethstats",
470 "Sending historical blocks to ethstats, first: {}, last: {}",
471 history.first().unwrap().number,
472 history.last().unwrap().number
473 );
474 }
475
476 let history_msg = HistoryMsg { id: self.credentials.node_id.clone(), history };
477
478 let message = history_msg.generate_history_message();
479 conn.write_json(&message).await?;
480
481 Ok(())
482 }
483
484 async fn report(&self) -> Result<(), EthStatsError> {
489 self.send_ping().await?;
490 self.report_block(None).await?;
491 self.report_pending().await?;
492 self.report_stats().await?;
493
494 Ok(())
495 }
496
497 async fn handle_message(&self, msg: Value) -> Result<(), EthStatsError> {
523 let emit = match msg.get("emit") {
524 Some(emit) => emit,
525 None => {
526 debug!(target: "ethstats", "Stats server sent non-broadcast, msg {}", msg);
527 return Err(EthStatsError::InvalidRequest);
528 }
529 };
530
531 let command = match emit.get(0) {
532 Some(Value::String(command)) => command.as_str(),
533 _ => {
534 debug!(target: "ethstats", "Invalid stats server message type, msg {}", msg);
535 return Err(EthStatsError::InvalidRequest);
536 }
537 };
538
539 match command {
540 "node-pong" => {
541 self.report_latency().await?;
542 }
543 "history" => {
544 let block_numbers = emit
545 .get(1)
546 .and_then(|v| v.as_object())
547 .and_then(|obj| obj.get("list"))
548 .and_then(|v| v.as_array());
549
550 if block_numbers.is_none() {
551 self.report_history(None).await?;
552
553 return Ok(());
554 }
555
556 let block_numbers = block_numbers
557 .unwrap()
558 .iter()
559 .map(|val| {
560 val.as_u64().ok_or_else(|| {
561 debug!(
562 target: "ethstats",
563 "Invalid stats history block number, msg {}", msg
564 );
565 EthStatsError::InvalidRequest
566 })
567 })
568 .collect::<Result<_, _>>()?;
569
570 self.report_history(Some(&block_numbers)).await?;
571 }
572 other => debug!(target: "ethstats", "Unhandled command: {}", other),
573 }
574
575 Ok(())
576 }
577
578 pub async fn run(self) {
590 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
592 let (message_tx, mut message_rx) = mpsc::channel(32);
593 let (head_tx, mut head_rx) = mpsc::channel(10);
594
595 let read_handle = {
597 let conn_arc = self.conn.clone();
598 let message_tx = message_tx.clone();
599 let shutdown_tx = shutdown_tx.clone();
600
601 tokio::spawn(async move {
602 loop {
603 let conn_guard = conn_arc.read().await;
604 if let Some(conn) = conn_guard.as_ref() {
605 match conn.read_json().await {
606 Ok(msg) => {
607 if message_tx.send(msg).await.is_err() {
608 break;
609 }
610 }
611 Err(e) => match e {
612 crate::error::ConnectionError::Serialization(err) => {
613 debug!(target: "ethstats", "JSON parse error from stats server: {}", err);
614 }
615 other => {
616 debug!(target: "ethstats", "Read error: {}", other);
617 drop(conn_guard);
618 if let Some(conn) = conn_arc.write().await.take() {
619 let _ = conn.close().await;
620 }
621 }
622 },
623 }
624 } else {
625 sleep(RECONNECT_INTERVAL).await;
626 }
627 }
628
629 let _ = shutdown_tx.send(()).await;
630 })
631 };
632
633 let canonical_stream_handle = {
634 let mut canonical_stream = self.provider.canonical_state_stream();
635 let head_tx = head_tx.clone();
636 let shutdown_tx = shutdown_tx.clone();
637
638 tokio::spawn(async move {
639 loop {
640 let head = canonical_stream.next().await;
641 if let Some(head) = head &&
642 head_tx.send(head).await.is_err()
643 {
644 break;
645 }
646 }
647
648 let _ = shutdown_tx.send(()).await;
649 })
650 };
651
652 let mut pending_tx_receiver = self.pool.pending_transactions_listener();
653
654 let mut report_interval = interval(REPORT_INTERVAL);
656 let mut reconnect_interval = interval(RECONNECT_INTERVAL);
657
658 loop {
660 tokio::select! {
661 _ = shutdown_rx.recv() => {
663 info!(target: "ethstats", "Shutting down ethstats service");
664 break;
665 }
666
667 Some(msg) = message_rx.recv() => {
669 if let Err(e) = self.handle_message(msg).await {
670 debug!(target: "ethstats", "Error handling message: {}", e);
671 self.disconnect().await;
672 }
673 }
674
675 Some(head) = head_rx.recv() => {
677 if let Err(e) = self.report_block(Some(head)).await {
678 debug!(target: "ethstats", "Failed to report block: {}", e);
679 self.disconnect().await;
680 }
681
682 if let Err(e) = self.report_pending().await {
683 debug!(target: "ethstats", "Failed to report pending: {}", e);
684 self.disconnect().await;
685 }
686 }
687
688 _= pending_tx_receiver.recv() => {
690 if let Err(e) = self.report_pending().await {
691 debug!(target: "ethstats", "Failed to report pending: {}", e);
692 self.disconnect().await;
693 }
694 }
695
696 _ = report_interval.tick() => {
698 if let Err(e) = self.report().await {
699 debug!(target: "ethstats", "Failed to report: {}", e);
700 self.disconnect().await;
701 }
702 }
703
704 _ = reconnect_interval.tick() => {
706 if self.conn.read().await.is_none() {
707 match self.connect().await {
708 Ok(_) => info!(target: "ethstats", "Reconnected successfully"),
709 Err(e) => debug!(target: "ethstats", "Reconnect failed: {}", e),
710 }
711 }
712 }
713 }
714 }
715
716 self.disconnect().await;
718
719 read_handle.abort();
721 canonical_stream_handle.abort();
722 }
723
724 async fn disconnect(&self) {
729 if let Some(conn) = self.conn.write().await.take() &&
730 let Err(e) = conn.close().await
731 {
732 debug!(target: "ethstats", "Error closing connection: {}", e);
733 }
734 }
735
736 #[cfg(test)]
738 pub async fn is_connected(&self) -> bool {
739 self.conn.read().await.is_some()
740 }
741}
742
743#[cfg(test)]
744mod tests {
745 use super::*;
746 use futures_util::{SinkExt, StreamExt};
747 use reth_network_api::noop::NoopNetwork;
748 use reth_storage_api::noop::NoopProvider;
749 use reth_transaction_pool::noop::NoopTransactionPool;
750 use serde_json::json;
751 use tokio::net::TcpListener;
752 use tokio_tungstenite::tungstenite::protocol::{frame::Utf8Bytes, Message};
753
754 const TEST_HOST: &str = "127.0.0.1";
755 const TEST_PORT: u16 = 0; async fn setup_mock_server() -> (String, tokio::task::JoinHandle<()>) {
758 let listener = TcpListener::bind((TEST_HOST, TEST_PORT)).await.unwrap();
759 let addr = listener.local_addr().unwrap();
760
761 let handle = tokio::spawn(async move {
762 let (stream, _) = listener.accept().await.unwrap();
763 let mut ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
764
765 if let Some(Ok(Message::Text(text))) = ws_stream.next().await {
767 let value: serde_json::Value = serde_json::from_str(&text).unwrap();
768 if value["emit"][0] == "hello" {
769 let response = json!({
770 "emit": ["ready", []]
771 });
772 ws_stream
773 .send(Message::Text(Utf8Bytes::from(response.to_string())))
774 .await
775 .unwrap();
776 }
777 }
778
779 while let Some(Ok(msg)) = ws_stream.next().await {
781 if let Message::Text(text) = msg &&
782 text.contains("node-ping")
783 {
784 let pong = json!({
785 "emit": ["node-pong", {"id": "test-node"}]
786 });
787 ws_stream.send(Message::Text(Utf8Bytes::from(pong.to_string()))).await.unwrap();
788 }
789 }
790 });
791
792 (addr.to_string(), handle)
793 }
794
795 #[tokio::test]
796 async fn test_connection_and_login() {
797 let (server_url, server_handle) = setup_mock_server().await;
798 let ethstats_url = format!("test-node:test-secret@{server_url}");
799
800 let network = NoopNetwork::default();
801 let provider = NoopProvider::default();
802 let pool = NoopTransactionPool::default();
803
804 let service = EthStatsService::new(ðstats_url, network, provider, pool)
805 .await
806 .expect("Service should connect");
807
808 assert!(service.is_connected().await, "Service should be connected");
810
811 server_handle.abort();
813 }
814
815 #[tokio::test]
816 async fn test_history_command_handling() {
817 let (server_url, server_handle) = setup_mock_server().await;
818 let ethstats_url = format!("test-node:test-secret@{server_url}");
819
820 let network = NoopNetwork::default();
821 let provider = NoopProvider::default();
822 let pool = NoopTransactionPool::default();
823
824 let service = EthStatsService::new(ðstats_url, network, provider, pool)
825 .await
826 .expect("Service should connect");
827
828 let history_cmd = json!({
830 "emit": ["history", {"list": [1, 2, 3]}]
831 });
832
833 service.handle_message(history_cmd).await.expect("History command should be handled");
834
835 server_handle.abort();
837 }
838
839 #[tokio::test]
840 async fn test_invalid_url_handling() {
841 let network = NoopNetwork::default();
842 let provider = NoopProvider::default();
843 let pool = NoopTransactionPool::default();
844
845 let result = EthStatsService::new(
847 "test-node@localhost",
848 network.clone(),
849 provider.clone(),
850 pool.clone(),
851 )
852 .await;
853 assert!(
854 matches!(result, Err(EthStatsError::InvalidUrl(_))),
855 "Should detect invalid URL format"
856 );
857
858 let result = EthStatsService::new("invalid-url", network, provider, pool).await;
860 assert!(
861 matches!(result, Err(EthStatsError::InvalidUrl(_))),
862 "Should detect invalid URL format"
863 );
864 }
865}