1use crate::{
2 connection::ConnWrapper,
3 credentials::EthstatsCredentials,
4 error::EthStatsError,
5 events::{
6 AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PendingMsg,
7 PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
8 },
9};
10use alloy_consensus::{BlockHeader, Sealable};
11use alloy_primitives::U256;
12use reth_chain_state::{CanonStateNotification, CanonStateSubscriptions};
13use reth_network_api::{NetworkInfo, Peers};
14use reth_primitives_traits::{Block, BlockBody};
15use reth_storage_api::{BlockReader, BlockReaderIdExt, NodePrimitivesProvider};
16use reth_transaction_pool::TransactionPool;
17
18use chrono::Local;
19use serde_json::Value;
20use std::{
21 str::FromStr,
22 sync::Arc,
23 time::{Duration, Instant},
24};
25use tokio::{
26 sync::{mpsc, Mutex, RwLock},
27 time::{interval, sleep, timeout},
28};
29use tokio_stream::StreamExt;
30use tokio_tungstenite::connect_async;
31use tracing::{debug, info};
32use url::Url;
33
34const HISTORY_UPDATE_RANGE: u64 = 50;
36const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
38const PING_TIMEOUT: Duration = Duration::from_secs(5);
40const REPORT_INTERVAL: Duration = Duration::from_secs(15);
42const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
44const READ_TIMEOUT: Duration = Duration::from_secs(30);
46
47#[derive(Debug)]
54pub struct EthStatsService<Network, Provider, Pool> {
55 credentials: EthstatsCredentials,
57 conn: Arc<RwLock<Option<ConnWrapper>>>,
59 last_ping: Arc<Mutex<Option<Instant>>>,
61 network: Network,
63 provider: Provider,
65 pool: Pool,
67}
68
69impl<Network, Provider, Pool> EthStatsService<Network, Provider, Pool>
70where
71 Network: NetworkInfo + Peers,
72 Provider: BlockReaderIdExt + CanonStateSubscriptions,
73 Pool: TransactionPool,
74{
75 pub async fn new(
83 url: &str,
84 network: Network,
85 provider: Provider,
86 pool: Pool,
87 ) -> Result<Self, EthStatsError> {
88 let credentials = EthstatsCredentials::from_str(url)?;
89 let service = Self {
90 credentials,
91 conn: Arc::new(RwLock::new(None)),
92 last_ping: Arc::new(Mutex::new(None)),
93 network,
94 provider,
95 pool,
96 };
97 service.connect().await?;
98
99 Ok(service)
100 }
101
102 async fn connect(&self) -> Result<(), EthStatsError> {
107 debug!(
108 target: "ethstats",
109 "Attempting to connect to EthStats server at {}", self.credentials.host
110 );
111 let full_url = format!("ws://{}/api", self.credentials.host);
112 let url = Url::parse(&full_url)
113 .map_err(|e| EthStatsError::InvalidUrl(format!("Invalid URL: {full_url} - {e}")))?;
114
115 match timeout(CONNECT_TIMEOUT, connect_async(url.to_string())).await {
116 Ok(Ok((ws_stream, _))) => {
117 debug!(
118 target: "ethstats",
119 "Successfully connected to EthStats server at {}", self.credentials.host
120 );
121 let conn: ConnWrapper = ConnWrapper::new(ws_stream);
122 *self.conn.write().await = Some(conn.clone());
123 self.login().await?;
124 Ok(())
125 }
126 Ok(Err(e)) => Err(EthStatsError::InvalidUrl(e.to_string())),
127 Err(_) => {
128 debug!(target: "ethstats", "Connection to EthStats server timed out");
129 Err(EthStatsError::Timeout)
130 }
131 }
132 }
133
134 async fn login(&self) -> Result<(), EthStatsError> {
139 debug!(
140 target: "ethstats",
141 "Attempting to login to EthStats server as node_id {}", self.credentials.node_id
142 );
143 let conn = self.conn.read().await;
144 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
145
146 let network_status = self
147 .network
148 .network_status()
149 .await
150 .map_err(|e| EthStatsError::AuthError(e.to_string()))?;
151 let id = &self.credentials.node_id;
152 let secret = &self.credentials.secret;
153 let protocol = network_status
154 .capabilities
155 .iter()
156 .map(|cap| format!("{}/{}", cap.name, cap.version))
157 .collect::<Vec<_>>()
158 .join(", ");
159 let port = self.network.local_addr().port() as u64;
160
161 let auth = AuthMsg {
162 id: id.clone(),
163 secret: secret.clone(),
164 info: NodeInfo {
165 name: id.clone(),
166 node: network_status.client_version.clone(),
167 port,
168 network: self.network.chain_id().to_string(),
169 protocol,
170 api: "No".to_string(),
171 os: std::env::consts::OS.into(),
172 os_ver: std::env::consts::ARCH.into(),
173 client: "0.1.1".to_string(),
174 history: true,
175 },
176 };
177
178 let message = auth.generate_login_message();
179 conn.write_json(&message).await?;
180
181 let response =
182 timeout(READ_TIMEOUT, conn.read_json()).await.map_err(|_| EthStatsError::Timeout)??;
183
184 if let Some(ack) = response.get("emit") {
185 if ack.get(0) == Some(&Value::String("ready".to_string())) {
186 info!(
187 target: "ethstats",
188 "Login successful to EthStats server as node_id {}", self.credentials.node_id
189 );
190 return Ok(());
191 }
192 }
193
194 debug!(target: "ethstats", "Login failed: Unauthorized or unexpected login response");
195 Err(EthStatsError::AuthError("Unauthorized or unexpected login response".into()))
196 }
197
198 async fn report_stats(&self) -> Result<(), EthStatsError> {
203 let conn = self.conn.read().await;
204 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
205
206 let stats_msg = StatsMsg {
207 id: self.credentials.node_id.clone(),
208 stats: NodeStats {
209 active: true,
210 syncing: self.network.is_syncing(),
211 peers: self.network.num_connected_peers() as u64,
212 gas_price: 0, uptime: 100,
214 },
215 };
216
217 let message = stats_msg.generate_stats_message();
218 conn.write_json(&message).await?;
219
220 Ok(())
221 }
222
223 async fn send_ping(&self) -> Result<(), EthStatsError> {
228 let conn = self.conn.read().await;
229 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
230
231 let ping_time = Instant::now();
232 *self.last_ping.lock().await = Some(ping_time);
233
234 let client_time = Local::now().format("%Y-%m-%d %H:%M:%S%.f %:z %Z").to_string();
235 let ping_msg = PingMsg { id: self.credentials.node_id.clone(), client_time };
236
237 let message = ping_msg.generate_ping_message();
238 conn.write_json(&message).await?;
239
240 let active_ping = self.last_ping.clone();
242 let conn_ref = self.conn.clone();
243 tokio::spawn(async move {
244 sleep(PING_TIMEOUT).await;
245 let mut active = active_ping.lock().await;
246 if active.is_some() {
247 debug!(target: "ethstats", "Ping timeout");
248 *active = None;
249 if let Some(conn) = conn_ref.write().await.take() {
251 let _ = conn.close().await;
252 }
253 }
254 });
255
256 Ok(())
257 }
258
259 async fn report_latency(&self) -> Result<(), EthStatsError> {
264 let conn = self.conn.read().await;
265 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
266
267 let mut active = self.last_ping.lock().await;
268 if let Some(start) = active.take() {
269 let latency = start.elapsed().as_millis() as u64 / 2;
270
271 debug!(target: "ethstats", "Reporting latency: {}ms", latency);
272
273 let latency_msg = LatencyMsg { id: self.credentials.node_id.clone(), latency };
274
275 let message = latency_msg.generate_latency_message();
276 conn.write_json(&message).await?
277 }
278
279 Ok(())
280 }
281
282 async fn report_pending(&self) -> Result<(), EthStatsError> {
287 let conn = self.conn.read().await;
288 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
289 let pending = self.pool.pool_size().pending as u64;
290
291 debug!(target: "ethstats", "Reporting pending txs: {}", pending);
292
293 let pending_msg =
294 PendingMsg { id: self.credentials.node_id.clone(), stats: PendingStats { pending } };
295
296 let message = pending_msg.generate_pending_message();
297 conn.write_json(&message).await?;
298
299 Ok(())
300 }
301
302 async fn report_block(
311 &self,
312 head: Option<CanonStateNotification<<Provider as NodePrimitivesProvider>::Primitives>>,
313 ) -> Result<(), EthStatsError> {
314 let conn = self.conn.read().await;
315 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
316
317 let block_number = if let Some(head) = head {
318 head.tip().header().number()
319 } else {
320 self.provider
321 .best_block_number()
322 .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?
323 };
324
325 match self.provider.block_by_id(block_number.into()) {
326 Ok(Some(block)) => {
327 let block_msg = BlockMsg {
328 id: self.credentials.node_id.clone(),
329 block: self.block_to_stats(&block)?,
330 };
331
332 debug!(target: "ethstats", "Reporting block: {}", block_number);
333
334 let message = block_msg.generate_block_message();
335 conn.write_json(&message).await?;
336 }
337 Ok(None) => {
338 debug!(target: "ethstats", "Block {} not found", block_number);
340 return Err(EthStatsError::BlockNotFound(block_number));
341 }
342 Err(e) => {
343 debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
344 return Err(EthStatsError::DataFetchError(e.to_string()));
345 }
346 };
347
348 Ok(())
349 }
350
351 fn block_to_stats(
359 &self,
360 block: &<Provider as BlockReader>::Block,
361 ) -> Result<BlockStats, EthStatsError> {
362 let body = block.body();
363 let header = block.header();
364
365 let txs = body.transaction_hashes_iter().copied().map(|hash| TxStats { hash }).collect();
366
367 Ok(BlockStats {
368 number: U256::from(header.number()),
369 hash: header.hash_slow(),
370 parent_hash: header.parent_hash(),
371 timestamp: U256::from(header.timestamp()),
372 miner: header.beneficiary(),
373 gas_used: header.gas_used(),
374 gas_limit: header.gas_limit(),
375 diff: header.difficulty().to_string(),
376 total_diff: "0".into(),
377 txs,
378 tx_root: header.transactions_root(),
379 root: header.state_root(),
380 uncles: UncleStats(vec![]),
381 })
382 }
383
384 async fn report_history(&self, list: Option<&Vec<u64>>) -> Result<(), EthStatsError> {
393 let conn = self.conn.read().await;
394 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
395
396 let indexes = if let Some(list) = list {
397 list
398 } else {
399 let best_block_number = self
400 .provider
401 .best_block_number()
402 .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?;
403
404 let start = best_block_number.saturating_sub(HISTORY_UPDATE_RANGE);
405
406 &(start..=best_block_number).collect()
407 };
408
409 let mut blocks = Vec::with_capacity(indexes.len());
410 for &block_number in indexes {
411 match self.provider.block_by_id(block_number.into()) {
412 Ok(Some(block)) => {
413 blocks.push(block);
414 }
415 Ok(None) => {
416 debug!(target: "ethstats", "Block {} not found", block_number);
418 break;
419 }
420 Err(e) => {
421 debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
422 break;
423 }
424 }
425 }
426
427 let history: Vec<BlockStats> =
428 blocks.iter().map(|block| self.block_to_stats(block)).collect::<Result<_, _>>()?;
429
430 if history.is_empty() {
431 debug!(target: "ethstats", "No history to send to stats server");
432 } else {
433 debug!(
434 target: "ethstats",
435 "Sending historical blocks to ethstats, first: {}, last: {}",
436 history.first().unwrap().number,
437 history.last().unwrap().number
438 );
439 }
440
441 let history_msg = HistoryMsg { id: self.credentials.node_id.clone(), history };
442
443 let message = history_msg.generate_history_message();
444 conn.write_json(&message).await?;
445
446 Ok(())
447 }
448
449 async fn report(&self) -> Result<(), EthStatsError> {
454 self.send_ping().await?;
455 self.report_block(None).await?;
456 self.report_pending().await?;
457 self.report_stats().await?;
458
459 Ok(())
460 }
461
462 async fn handle_message(&self, msg: Value) -> Result<(), EthStatsError> {
488 let emit = match msg.get("emit") {
489 Some(emit) => emit,
490 None => {
491 debug!(target: "ethstats", "Stats server sent non-broadcast, msg {}", msg);
492 return Err(EthStatsError::InvalidRequest);
493 }
494 };
495
496 let command = match emit.get(0) {
497 Some(Value::String(command)) => command.as_str(),
498 _ => {
499 debug!(target: "ethstats", "Invalid stats server message type, msg {}", msg);
500 return Err(EthStatsError::InvalidRequest);
501 }
502 };
503
504 match command {
505 "node-pong" => {
506 self.report_latency().await?;
507 }
508 "history" => {
509 let block_numbers = emit
510 .get(1)
511 .and_then(|v| v.as_object())
512 .and_then(|obj| obj.get("list"))
513 .and_then(|v| v.as_array());
514
515 if block_numbers.is_none() {
516 self.report_history(None).await?;
517
518 return Ok(());
519 }
520
521 let block_numbers = block_numbers
522 .unwrap()
523 .iter()
524 .map(|val| {
525 val.as_u64().ok_or_else(|| {
526 debug!(
527 target: "ethstats",
528 "Invalid stats history block number, msg {}", msg
529 );
530 EthStatsError::InvalidRequest
531 })
532 })
533 .collect::<Result<_, _>>()?;
534
535 self.report_history(Some(&block_numbers)).await?;
536 }
537 other => debug!(target: "ethstats", "Unhandled command: {}", other),
538 }
539
540 Ok(())
541 }
542
543 pub async fn run(self) {
555 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
557 let (message_tx, mut message_rx) = mpsc::channel(32);
558 let (head_tx, mut head_rx) = mpsc::channel(10);
559
560 let read_handle = {
562 let conn = self.conn.clone();
563 let message_tx = message_tx.clone();
564 let shutdown_tx = shutdown_tx.clone();
565
566 tokio::spawn(async move {
567 loop {
568 let conn = conn.read().await;
569 if let Some(conn) = conn.as_ref() {
570 match conn.read_json().await {
571 Ok(msg) => {
572 if message_tx.send(msg).await.is_err() {
573 break;
574 }
575 }
576 Err(e) => {
577 debug!(target: "ethstats", "Read error: {}", e);
578 break;
579 }
580 }
581 } else {
582 sleep(RECONNECT_INTERVAL).await;
583 }
584 }
585
586 let _ = shutdown_tx.send(()).await;
587 })
588 };
589
590 let canonical_stream_handle = {
591 let mut canonical_stream = self.provider.canonical_state_stream();
592 let head_tx = head_tx.clone();
593 let shutdown_tx = shutdown_tx.clone();
594
595 tokio::spawn(async move {
596 loop {
597 let head = canonical_stream.next().await;
598 if let Some(head) = head {
599 if head_tx.send(head).await.is_err() {
600 break;
601 }
602 }
603 }
604
605 let _ = shutdown_tx.send(()).await;
606 })
607 };
608
609 let mut pending_tx_receiver = self.pool.pending_transactions_listener();
610
611 let mut report_interval = interval(REPORT_INTERVAL);
613 let mut reconnect_interval = interval(RECONNECT_INTERVAL);
614
615 loop {
617 tokio::select! {
618 _ = shutdown_rx.recv() => {
620 info!(target: "ethstats", "Shutting down ethstats service");
621 break;
622 }
623
624 Some(msg) = message_rx.recv() => {
626 if let Err(e) = self.handle_message(msg).await {
627 debug!(target: "ethstats", "Error handling message: {}", e);
628 self.disconnect().await;
629 }
630 }
631
632 Some(head) = head_rx.recv() => {
634 if let Err(e) = self.report_block(Some(head)).await {
635 debug!(target: "ethstats", "Failed to report block: {}", e);
636 self.disconnect().await;
637 }
638
639 if let Err(e) = self.report_pending().await {
640 debug!(target: "ethstats", "Failed to report pending: {}", e);
641 self.disconnect().await;
642 }
643 }
644
645 _= pending_tx_receiver.recv() => {
647 if let Err(e) = self.report_pending().await {
648 debug!(target: "ethstats", "Failed to report pending: {}", e);
649 self.disconnect().await;
650 }
651 }
652
653 _ = report_interval.tick() => {
655 if let Err(e) = self.report().await {
656 debug!(target: "ethstats", "Failed to report: {}", e);
657 self.disconnect().await;
658 }
659 }
660
661 _ = reconnect_interval.tick(), if self.conn.read().await.is_none() => {
663 match self.connect().await {
664 Ok(_) => info!(target: "ethstats", "Reconnected successfully"),
665 Err(e) => debug!(target: "ethstats", "Reconnect failed: {}", e),
666 }
667 }
668 }
669 }
670
671 self.disconnect().await;
673
674 read_handle.abort();
676 canonical_stream_handle.abort();
677 }
678
679 async fn disconnect(&self) {
684 if let Some(conn) = self.conn.write().await.take() {
685 if let Err(e) = conn.close().await {
686 debug!(target: "ethstats", "Error closing connection: {}", e);
687 }
688 }
689 }
690
691 #[cfg(test)]
693 pub async fn is_connected(&self) -> bool {
694 self.conn.read().await.is_some()
695 }
696}
697
698#[cfg(test)]
699mod tests {
700 use super::*;
701 use futures_util::{SinkExt, StreamExt};
702 use reth_network_api::noop::NoopNetwork;
703 use reth_storage_api::noop::NoopProvider;
704 use reth_transaction_pool::noop::NoopTransactionPool;
705 use serde_json::json;
706 use tokio::net::TcpListener;
707 use tokio_tungstenite::tungstenite::protocol::{frame::Utf8Bytes, Message};
708
709 const TEST_HOST: &str = "127.0.0.1";
710 const TEST_PORT: u16 = 0; async fn setup_mock_server() -> (String, tokio::task::JoinHandle<()>) {
713 let listener = TcpListener::bind((TEST_HOST, TEST_PORT)).await.unwrap();
714 let addr = listener.local_addr().unwrap();
715
716 let handle = tokio::spawn(async move {
717 let (stream, _) = listener.accept().await.unwrap();
718 let mut ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
719
720 if let Some(Ok(Message::Text(text))) = ws_stream.next().await {
722 let value: serde_json::Value = serde_json::from_str(&text).unwrap();
723 if value["emit"][0] == "hello" {
724 let response = json!({
725 "emit": ["ready", []]
726 });
727 ws_stream
728 .send(Message::Text(Utf8Bytes::from(response.to_string())))
729 .await
730 .unwrap();
731 }
732 }
733
734 while let Some(Ok(msg)) = ws_stream.next().await {
736 if let Message::Text(text) = msg {
737 if text.contains("node-ping") {
738 let pong = json!({
739 "emit": ["node-pong", {"id": "test-node"}]
740 });
741 ws_stream
742 .send(Message::Text(Utf8Bytes::from(pong.to_string())))
743 .await
744 .unwrap();
745 }
746 }
747 }
748 });
749
750 (addr.to_string(), handle)
751 }
752
753 #[tokio::test]
754 async fn test_connection_and_login() {
755 let (server_url, server_handle) = setup_mock_server().await;
756 let ethstats_url = format!("test-node:test-secret@{server_url}");
757
758 let network = NoopNetwork::default();
759 let provider = NoopProvider::default();
760 let pool = NoopTransactionPool::default();
761
762 let service = EthStatsService::new(ðstats_url, network, provider, pool)
763 .await
764 .expect("Service should connect");
765
766 assert!(service.is_connected().await, "Service should be connected");
768
769 server_handle.abort();
771 }
772
773 #[tokio::test]
774 async fn test_history_command_handling() {
775 let (server_url, server_handle) = setup_mock_server().await;
776 let ethstats_url = format!("test-node:test-secret@{server_url}");
777
778 let network = NoopNetwork::default();
779 let provider = NoopProvider::default();
780 let pool = NoopTransactionPool::default();
781
782 let service = EthStatsService::new(ðstats_url, network, provider, pool)
783 .await
784 .expect("Service should connect");
785
786 let history_cmd = json!({
788 "emit": ["history", {"list": [1, 2, 3]}]
789 });
790
791 service.handle_message(history_cmd).await.expect("History command should be handled");
792
793 server_handle.abort();
795 }
796
797 #[tokio::test]
798 async fn test_invalid_url_handling() {
799 let network = NoopNetwork::default();
800 let provider = NoopProvider::default();
801 let pool = NoopTransactionPool::default();
802
803 let result = EthStatsService::new(
805 "test-node@localhost",
806 network.clone(),
807 provider.clone(),
808 pool.clone(),
809 )
810 .await;
811 assert!(
812 matches!(result, Err(EthStatsError::InvalidUrl(_))),
813 "Should detect invalid URL format"
814 );
815
816 let result = EthStatsService::new("invalid-url", network, provider, pool).await;
818 assert!(
819 matches!(result, Err(EthStatsError::InvalidUrl(_))),
820 "Should detect invalid URL format"
821 );
822 }
823}