reth_node_ethstats/
ethstats.rs

1use crate::{
2    connection::ConnWrapper,
3    credentials::EthstatsCredentials,
4    error::EthStatsError,
5    events::{
6        AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PayloadMsg,
7        PayloadStats, PendingMsg, PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
8    },
9};
10use alloy_consensus::{BlockHeader, Sealable};
11use alloy_primitives::U256;
12use reth_chain_state::{CanonStateNotification, CanonStateSubscriptions};
13use reth_network_api::{NetworkInfo, Peers};
14use reth_primitives_traits::{Block, BlockBody};
15use reth_storage_api::{BlockReader, BlockReaderIdExt, NodePrimitivesProvider};
16use reth_transaction_pool::TransactionPool;
17
18use chrono::Local;
19use serde_json::Value;
20use std::{
21    str::FromStr,
22    sync::Arc,
23    time::{Duration, Instant},
24};
25use tokio::{
26    sync::{mpsc, Mutex, RwLock},
27    time::{interval, sleep, timeout},
28};
29use tokio_stream::StreamExt;
30use tokio_tungstenite::connect_async;
31use tracing::{debug, info};
32use url::Url;
33
34/// Number of historical blocks to include in a history update sent to the `EthStats` server
35const HISTORY_UPDATE_RANGE: u64 = 50;
36/// Duration to wait before attempting to reconnect to the `EthStats` server
37const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
38/// Maximum time to wait for a ping response from the server
39const PING_TIMEOUT: Duration = Duration::from_secs(5);
40/// Interval between regular stats reports to the server
41const REPORT_INTERVAL: Duration = Duration::from_secs(15);
42/// Maximum time to wait for initial connection establishment
43const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
44/// Maximum time to wait for reading messages from the server
45const READ_TIMEOUT: Duration = Duration::from_secs(30);
46
47/// Main service for interacting with an `EthStats` server
48///
49/// This service handles all communication with the `EthStats` server including
50/// authentication, stats reporting, block notifications, and connection management.
51/// It maintains a persistent `WebSocket` connection and automatically reconnects
52/// when the connection is lost.
53#[derive(Debug, Clone)]
54pub struct EthStatsService<Network, Provider, Pool> {
55    /// Authentication credentials for the `EthStats` server
56    credentials: EthstatsCredentials,
57    /// `WebSocket` connection wrapper, wrapped in `Arc<RwLock>` for shared access
58    conn: Arc<RwLock<Option<ConnWrapper>>>,
59    /// Timestamp of the last ping sent to the server
60    last_ping: Arc<Mutex<Option<Instant>>>,
61    /// Network interface for getting peer and sync information
62    network: Network,
63    /// Blockchain provider for reading block data and state
64    provider: Provider,
65    /// Transaction pool for getting pending transaction statistics
66    pool: Pool,
67}
68
69impl<Network, Provider, Pool> EthStatsService<Network, Provider, Pool>
70where
71    Network: NetworkInfo + Peers,
72    Provider: BlockReaderIdExt + CanonStateSubscriptions,
73    Pool: TransactionPool,
74{
75    /// Create a new `EthStats` service and establish initial connection
76    ///
77    /// # Arguments
78    /// * `url` - Connection string in format "`node_id:secret@host`"
79    /// * `network` - Network interface implementation
80    /// * `provider` - Blockchain provider implementation
81    /// * `pool` - Transaction pool implementation
82    pub async fn new(
83        url: &str,
84        network: Network,
85        provider: Provider,
86        pool: Pool,
87    ) -> Result<Self, EthStatsError> {
88        let credentials = EthstatsCredentials::from_str(url)?;
89        let service = Self {
90            credentials,
91            conn: Arc::new(RwLock::new(None)),
92            last_ping: Arc::new(Mutex::new(None)),
93            network,
94            provider,
95            pool,
96        };
97        service.connect().await?;
98
99        Ok(service)
100    }
101
102    /// Establish `WebSocket` connection to the `EthStats` server
103    ///
104    /// Attempts to connect to the server using the credentials and handles
105    /// connection timeouts and errors.
106    async fn connect(&self) -> Result<(), EthStatsError> {
107        debug!(
108            target: "ethstats",
109            "Attempting to connect to EthStats server at {}", self.credentials.host
110        );
111        let full_url = format!("ws://{}/api", self.credentials.host);
112        let url = Url::parse(&full_url).map_err(EthStatsError::Url)?;
113
114        match timeout(CONNECT_TIMEOUT, connect_async(url.as_str())).await {
115            Ok(Ok((ws_stream, _))) => {
116                debug!(
117                    target: "ethstats",
118                    "Successfully connected to EthStats server at {}", self.credentials.host
119                );
120                let conn: ConnWrapper = ConnWrapper::new(ws_stream);
121                *self.conn.write().await = Some(conn);
122                self.login().await?;
123                Ok(())
124            }
125            Ok(Err(e)) => Err(EthStatsError::WebSocket(e)),
126            Err(_) => {
127                debug!(target: "ethstats", "Connection to EthStats server timed out");
128                Err(EthStatsError::Timeout)
129            }
130        }
131    }
132
133    /// Authenticate with the `EthStats` server
134    ///
135    /// Sends authentication credentials and node information to the server
136    /// and waits for a successful acknowledgment.
137    async fn login(&self) -> Result<(), EthStatsError> {
138        debug!(
139            target: "ethstats",
140            "Attempting to login to EthStats server as node_id {}", self.credentials.node_id
141        );
142        let conn = self.conn.read().await;
143        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
144
145        let network_status = self
146            .network
147            .network_status()
148            .await
149            .map_err(|e| EthStatsError::AuthError(e.to_string()))?;
150        let id = &self.credentials.node_id;
151        let secret = &self.credentials.secret;
152        let protocol = network_status
153            .capabilities
154            .iter()
155            .map(|cap| format!("{}/{}", cap.name, cap.version))
156            .collect::<Vec<_>>()
157            .join(", ");
158        let port = self.network.local_addr().port() as u64;
159
160        let auth = AuthMsg {
161            id: id.clone(),
162            secret: secret.clone(),
163            info: NodeInfo {
164                name: id.clone(),
165                node: network_status.client_version.clone(),
166                port,
167                network: self.network.chain_id().to_string(),
168                protocol,
169                api: "No".to_string(),
170                os: std::env::consts::OS.into(),
171                os_ver: std::env::consts::ARCH.into(),
172                client: "0.1.1".to_string(),
173                history: true,
174            },
175        };
176
177        let message = auth.generate_login_message();
178        conn.write_json(&message).await?;
179
180        let response =
181            timeout(READ_TIMEOUT, conn.read_json()).await.map_err(|_| EthStatsError::Timeout)??;
182
183        if let Some(ack) = response.get("emit") &&
184            ack.get(0) == Some(&Value::String("ready".to_string()))
185        {
186            info!(
187                target: "ethstats",
188                "Login successful to EthStats server as node_id {}", self.credentials.node_id
189            );
190            return Ok(());
191        }
192
193        debug!(target: "ethstats", "Login failed: Unauthorized or unexpected login response");
194        Err(EthStatsError::AuthError("Unauthorized or unexpected login response".into()))
195    }
196
197    /// Report current node statistics to the `EthStats` server
198    ///
199    /// Sends information about the node's current state including sync status,
200    /// peer count, and uptime.
201    async fn report_stats(&self) -> Result<(), EthStatsError> {
202        let conn = self.conn.read().await;
203        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
204
205        let stats_msg = StatsMsg {
206            id: self.credentials.node_id.clone(),
207            stats: NodeStats {
208                active: true,
209                syncing: self.network.is_syncing(),
210                peers: self.network.num_connected_peers() as u64,
211                gas_price: self.pool.block_info().pending_basefee,
212                uptime: 100,
213            },
214        };
215
216        let message = stats_msg.generate_stats_message();
217        conn.write_json(&message).await?;
218
219        Ok(())
220    }
221
222    /// Send a ping message to the `EthStats` server
223    ///
224    /// Records the ping time and starts a timeout task to detect if the server
225    /// doesn't respond within the expected timeframe.
226    async fn send_ping(&self) -> Result<(), EthStatsError> {
227        let conn = self.conn.read().await;
228        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
229
230        let ping_time = Instant::now();
231        *self.last_ping.lock().await = Some(ping_time);
232
233        let client_time = Local::now().format("%Y-%m-%d %H:%M:%S%.f %:z %Z").to_string();
234        let ping_msg = PingMsg { id: self.credentials.node_id.clone(), client_time };
235
236        let message = ping_msg.generate_ping_message();
237        conn.write_json(&message).await?;
238
239        // Start ping timeout
240        let active_ping = self.last_ping.clone();
241        let conn_ref = self.conn.clone();
242        tokio::spawn(async move {
243            sleep(PING_TIMEOUT).await;
244            let mut active = active_ping.lock().await;
245            if active.is_some() {
246                debug!(target: "ethstats", "Ping timeout");
247                *active = None;
248                // Clear connection to trigger reconnect
249                if let Some(conn) = conn_ref.write().await.take() {
250                    let _ = conn.close().await;
251                }
252            }
253        });
254
255        Ok(())
256    }
257
258    /// Report latency measurement to the `EthStats` server
259    ///
260    /// Calculates the round-trip time from the last ping and sends it to
261    /// the server. This is called when a pong response is received.
262    async fn report_latency(&self) -> Result<(), EthStatsError> {
263        let conn = self.conn.read().await;
264        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
265
266        let mut active = self.last_ping.lock().await;
267        if let Some(start) = active.take() {
268            let latency = start.elapsed().as_millis() as u64 / 2;
269
270            debug!(target: "ethstats", "Reporting latency: {}ms", latency);
271
272            let latency_msg = LatencyMsg { id: self.credentials.node_id.clone(), latency };
273
274            let message = latency_msg.generate_latency_message();
275            conn.write_json(&message).await?
276        }
277
278        Ok(())
279    }
280
281    /// Report pending transaction count to the `EthStats` server
282    ///
283    /// Gets the current number of pending transactions from the pool and
284    /// sends this information to the server.
285    async fn report_pending(&self) -> Result<(), EthStatsError> {
286        let conn = self.conn.read().await;
287        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
288        let pending = self.pool.pool_size().pending as u64;
289
290        debug!(target: "ethstats", "Reporting pending txs: {}", pending);
291
292        let pending_msg =
293            PendingMsg { id: self.credentials.node_id.clone(), stats: PendingStats { pending } };
294
295        let message = pending_msg.generate_pending_message();
296        conn.write_json(&message).await?;
297
298        Ok(())
299    }
300
301    /// Report block information to the `EthStats` server
302    ///
303    /// Fetches block data either from a canonical state notification or
304    /// the current best block, converts it to stats format, and sends
305    /// it to the server.
306    ///
307    /// # Arguments
308    /// * `head` - Optional canonical state notification containing new block info
309    async fn report_block(
310        &self,
311        head: Option<CanonStateNotification<<Provider as NodePrimitivesProvider>::Primitives>>,
312    ) -> Result<(), EthStatsError> {
313        let conn = self.conn.read().await;
314        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
315
316        let block_number = if let Some(head) = head {
317            head.tip().header().number()
318        } else {
319            self.provider
320                .best_block_number()
321                .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?
322        };
323
324        match self.provider.block_by_id(block_number.into()) {
325            Ok(Some(block)) => {
326                let block_msg = BlockMsg {
327                    id: self.credentials.node_id.clone(),
328                    block: self.block_to_stats(&block)?,
329                };
330
331                debug!(target: "ethstats", "Reporting block: {}", block_number);
332
333                let message = block_msg.generate_block_message();
334                conn.write_json(&message).await?;
335            }
336            Ok(None) => {
337                // Block not found, stop fetching
338                debug!(target: "ethstats", "Block {} not found", block_number);
339                return Err(EthStatsError::BlockNotFound(block_number));
340            }
341            Err(e) => {
342                debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
343                return Err(EthStatsError::DataFetchError(e.to_string()));
344            }
345        };
346
347        Ok(())
348    }
349
350    /// Report new payload information to the `EthStats` server
351    ///
352    /// Sends information about payload processing time and block details
353    /// to the server for monitoring purposes.
354    pub async fn report_new_payload(
355        &self,
356        block_hash: alloy_primitives::B256,
357        block_number: u64,
358        processing_time: Duration,
359    ) -> Result<(), EthStatsError> {
360        let conn = self.conn.read().await;
361        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
362
363        let payload_stats = PayloadStats {
364            number: U256::from(block_number),
365            hash: block_hash,
366            processing_time: processing_time.as_millis() as u64,
367        };
368
369        let payload_msg =
370            PayloadMsg { id: self.credentials.node_id.clone(), payload: payload_stats };
371
372        debug!(
373            target: "ethstats",
374            "Reporting new payload: block={}, hash={:?}, processing_time={}ms",
375            block_number,
376            block_hash,
377            processing_time.as_millis()
378        );
379
380        let message = payload_msg.generate_new_payload_message();
381        conn.write_json(&message).await?;
382
383        Ok(())
384    }
385
386    /// Convert a block to `EthStats` block statistics format
387    ///
388    /// Extracts relevant information from a block and formats it according
389    /// to the `EthStats` protocol specification.
390    ///
391    /// # Arguments
392    /// * `block` - The block to convert
393    fn block_to_stats(
394        &self,
395        block: &<Provider as BlockReader>::Block,
396    ) -> Result<BlockStats, EthStatsError> {
397        let body = block.body();
398        let header = block.header();
399
400        let txs = body.transaction_hashes_iter().copied().map(|hash| TxStats { hash }).collect();
401
402        Ok(BlockStats {
403            number: U256::from(header.number()),
404            hash: header.hash_slow(),
405            parent_hash: header.parent_hash(),
406            timestamp: U256::from(header.timestamp()),
407            miner: header.beneficiary(),
408            gas_used: header.gas_used(),
409            gas_limit: header.gas_limit(),
410            diff: header.difficulty().to_string(),
411            total_diff: "0".into(),
412            txs,
413            tx_root: header.transactions_root(),
414            root: header.state_root(),
415            uncles: UncleStats(vec![]),
416        })
417    }
418
419    /// Report historical block data to the `EthStats` server
420    ///
421    /// Fetches multiple blocks by their numbers and sends their statistics
422    /// to the server. This is typically called in response to a history
423    /// request from the server.
424    ///
425    /// # Arguments
426    /// * `list` - Vector of block numbers to fetch and report
427    async fn report_history(&self, list: Option<&Vec<u64>>) -> Result<(), EthStatsError> {
428        let conn = self.conn.read().await;
429        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
430
431        let indexes = if let Some(list) = list {
432            list
433        } else {
434            let best_block_number = self
435                .provider
436                .best_block_number()
437                .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?;
438
439            let start = best_block_number.saturating_sub(HISTORY_UPDATE_RANGE);
440
441            &(start..=best_block_number).collect()
442        };
443
444        let mut blocks = Vec::with_capacity(indexes.len());
445        for &block_number in indexes {
446            match self.provider.block_by_id(block_number.into()) {
447                Ok(Some(block)) => {
448                    blocks.push(block);
449                }
450                Ok(None) => {
451                    // Block not found, stop fetching
452                    debug!(target: "ethstats", "Block {} not found", block_number);
453                    break;
454                }
455                Err(e) => {
456                    debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
457                    break;
458                }
459            }
460        }
461
462        let history: Vec<BlockStats> =
463            blocks.iter().map(|block| self.block_to_stats(block)).collect::<Result<_, _>>()?;
464
465        if history.is_empty() {
466            debug!(target: "ethstats", "No history to send to stats server");
467        } else {
468            debug!(
469                target: "ethstats",
470                "Sending historical blocks to ethstats, first: {}, last: {}",
471                history.first().unwrap().number,
472                history.last().unwrap().number
473            );
474        }
475
476        let history_msg = HistoryMsg { id: self.credentials.node_id.clone(), history };
477
478        let message = history_msg.generate_history_message();
479        conn.write_json(&message).await?;
480
481        Ok(())
482    }
483
484    /// Send a complete status report to the `EthStats` server
485    ///
486    /// Performs all regular reporting tasks: ping, block info, pending
487    /// transactions, and general statistics.
488    async fn report(&self) -> Result<(), EthStatsError> {
489        self.send_ping().await?;
490        self.report_block(None).await?;
491        self.report_pending().await?;
492        self.report_stats().await?;
493
494        Ok(())
495    }
496
497    /// Handle incoming messages from the `EthStats` server
498    ///
499    /// # Expected Message Variants
500    ///
501    /// This function expects messages in the following format:
502    ///
503    /// ```json
504    /// { "emit": [<command: String>, <payload: Object>] }
505    /// ```
506    ///
507    /// ## Supported Commands:
508    ///
509    /// - `"node-pong"`: Indicates a pong response to a previously sent ping. The payload is
510    ///   ignored. Triggers a latency report to the server.
511    ///   - Example: ```json { "emit": [ "node-pong", { "clientTime": "2025-07-10 12:00:00.123
512    ///     +00:00 UTC", "serverTime": "2025-07-10 12:00:01.456 +00:00 UTC" } ] } ```
513    ///
514    /// - `"history"`: Requests historical block data. The payload may contain a `list` field with
515    ///   block numbers to fetch. If `list` is not present, the default range is used.
516    ///   - Example with list: `{ "emit": ["history", {"list": [1, 2, 3], "min": 1, "max": 3}] }`
517    ///   - Example without list: `{ "emit": ["history", {}] }`
518    ///
519    /// ## Other Commands:
520    ///
521    /// Any other command is logged as unhandled and ignored.
522    async fn handle_message(&self, msg: Value) -> Result<(), EthStatsError> {
523        let emit = match msg.get("emit") {
524            Some(emit) => emit,
525            None => {
526                debug!(target: "ethstats", "Stats server sent non-broadcast, msg {}", msg);
527                return Err(EthStatsError::InvalidRequest);
528            }
529        };
530
531        let command = match emit.get(0) {
532            Some(Value::String(command)) => command.as_str(),
533            _ => {
534                debug!(target: "ethstats", "Invalid stats server message type, msg {}", msg);
535                return Err(EthStatsError::InvalidRequest);
536            }
537        };
538
539        match command {
540            "node-pong" => {
541                self.report_latency().await?;
542            }
543            "history" => {
544                let block_numbers = emit
545                    .get(1)
546                    .and_then(|v| v.as_object())
547                    .and_then(|obj| obj.get("list"))
548                    .and_then(|v| v.as_array());
549
550                if block_numbers.is_none() {
551                    self.report_history(None).await?;
552
553                    return Ok(());
554                }
555
556                let block_numbers = block_numbers
557                    .unwrap()
558                    .iter()
559                    .map(|val| {
560                        val.as_u64().ok_or_else(|| {
561                            debug!(
562                                target: "ethstats",
563                                "Invalid stats history block number, msg {}", msg
564                            );
565                            EthStatsError::InvalidRequest
566                        })
567                    })
568                    .collect::<Result<_, _>>()?;
569
570                self.report_history(Some(&block_numbers)).await?;
571            }
572            other => debug!(target: "ethstats", "Unhandled command: {}", other),
573        }
574
575        Ok(())
576    }
577
578    /// Main service loop that handles all `EthStats` communication
579    ///
580    /// This method runs the main event loop that:
581    /// - Maintains the `WebSocket` connection
582    /// - Handles incoming messages from the server
583    /// - Reports statistics at regular intervals
584    /// - Processes new block notifications
585    /// - Automatically reconnects when the connection is lost
586    ///
587    /// The service runs until explicitly shut down or an unrecoverable
588    /// error occurs.
589    pub async fn run(self) {
590        // Create channels for internal communication
591        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
592        let (message_tx, mut message_rx) = mpsc::channel(32);
593        let (head_tx, mut head_rx) = mpsc::channel(10);
594
595        // Start the read loop in a separate task
596        let read_handle = {
597            let conn_arc = self.conn.clone();
598            let message_tx = message_tx.clone();
599            let shutdown_tx = shutdown_tx.clone();
600
601            tokio::spawn(async move {
602                loop {
603                    let conn_guard = conn_arc.read().await;
604                    if let Some(conn) = conn_guard.as_ref() {
605                        match conn.read_json().await {
606                            Ok(msg) => {
607                                if message_tx.send(msg).await.is_err() {
608                                    break;
609                                }
610                            }
611                            Err(e) => match e {
612                                crate::error::ConnectionError::Serialization(err) => {
613                                    debug!(target: "ethstats", "JSON parse error from stats server: {}", err);
614                                }
615                                other => {
616                                    debug!(target: "ethstats", "Read error: {}", other);
617                                    drop(conn_guard);
618                                    if let Some(conn) = conn_arc.write().await.take() {
619                                        let _ = conn.close().await;
620                                    }
621                                }
622                            },
623                        }
624                    } else {
625                        sleep(RECONNECT_INTERVAL).await;
626                    }
627                }
628
629                let _ = shutdown_tx.send(()).await;
630            })
631        };
632
633        let canonical_stream_handle = {
634            let mut canonical_stream = self.provider.canonical_state_stream();
635            let head_tx = head_tx.clone();
636            let shutdown_tx = shutdown_tx.clone();
637
638            tokio::spawn(async move {
639                loop {
640                    let head = canonical_stream.next().await;
641                    if let Some(head) = head &&
642                        head_tx.send(head).await.is_err()
643                    {
644                        break;
645                    }
646                }
647
648                let _ = shutdown_tx.send(()).await;
649            })
650        };
651
652        let mut pending_tx_receiver = self.pool.pending_transactions_listener();
653
654        // Set up intervals
655        let mut report_interval = interval(REPORT_INTERVAL);
656        let mut reconnect_interval = interval(RECONNECT_INTERVAL);
657
658        // Main event loop using select!
659        loop {
660            tokio::select! {
661                // Handle shutdown signal
662                _ = shutdown_rx.recv() => {
663                    info!(target: "ethstats", "Shutting down ethstats service");
664                    break;
665                }
666
667                // Handle messages from the read loop
668                Some(msg) = message_rx.recv() => {
669                    if let Err(e) = self.handle_message(msg).await {
670                        debug!(target: "ethstats", "Error handling message: {}", e);
671                        self.disconnect().await;
672                    }
673                }
674
675                // Handle new block
676                Some(head) = head_rx.recv() => {
677                    if let Err(e) = self.report_block(Some(head)).await {
678                        debug!(target: "ethstats", "Failed to report block: {}", e);
679                        self.disconnect().await;
680                    }
681
682                    if let Err(e) = self.report_pending().await {
683                        debug!(target: "ethstats", "Failed to report pending: {}", e);
684                        self.disconnect().await;
685                    }
686                }
687
688                // Handle new pending tx
689                _= pending_tx_receiver.recv() => {
690                    if let Err(e) = self.report_pending().await {
691                        debug!(target: "ethstats", "Failed to report pending: {}", e);
692                        self.disconnect().await;
693                    }
694                }
695
696                // Handle stats reporting
697                _ = report_interval.tick() => {
698                    if let Err(e) = self.report().await {
699                        debug!(target: "ethstats", "Failed to report: {}", e);
700                        self.disconnect().await;
701                    }
702                }
703
704                // Handle reconnection
705                _ = reconnect_interval.tick() => {
706                    if self.conn.read().await.is_none() {
707                        match self.connect().await {
708                            Ok(_) => info!(target: "ethstats", "Reconnected successfully"),
709                            Err(e) => debug!(target: "ethstats", "Reconnect failed: {}", e),
710                        }
711                    }
712                }
713            }
714        }
715
716        // Cleanup
717        self.disconnect().await;
718
719        // Cancel background tasks
720        read_handle.abort();
721        canonical_stream_handle.abort();
722    }
723
724    /// Gracefully close the `WebSocket` connection
725    ///
726    /// Attempts to close the connection cleanly and logs any errors
727    /// that occur during the process.
728    async fn disconnect(&self) {
729        if let Some(conn) = self.conn.write().await.take() &&
730            let Err(e) = conn.close().await
731        {
732            debug!(target: "ethstats", "Error closing connection: {}", e);
733        }
734    }
735
736    /// Test helper to check connection status
737    #[cfg(test)]
738    pub async fn is_connected(&self) -> bool {
739        self.conn.read().await.is_some()
740    }
741}
742
743#[cfg(test)]
744mod tests {
745    use super::*;
746    use futures_util::{SinkExt, StreamExt};
747    use reth_network_api::noop::NoopNetwork;
748    use reth_storage_api::noop::NoopProvider;
749    use reth_transaction_pool::noop::NoopTransactionPool;
750    use serde_json::json;
751    use tokio::net::TcpListener;
752    use tokio_tungstenite::tungstenite::protocol::{frame::Utf8Bytes, Message};
753
754    const TEST_HOST: &str = "127.0.0.1";
755    const TEST_PORT: u16 = 0; // Let OS choose port
756
757    async fn setup_mock_server() -> (String, tokio::task::JoinHandle<()>) {
758        let listener = TcpListener::bind((TEST_HOST, TEST_PORT)).await.unwrap();
759        let addr = listener.local_addr().unwrap();
760
761        let handle = tokio::spawn(async move {
762            let (stream, _) = listener.accept().await.unwrap();
763            let mut ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
764
765            // Handle login
766            if let Some(Ok(Message::Text(text))) = ws_stream.next().await {
767                let value: serde_json::Value = serde_json::from_str(&text).unwrap();
768                if value["emit"][0] == "hello" {
769                    let response = json!({
770                        "emit": ["ready", []]
771                    });
772                    ws_stream
773                        .send(Message::Text(Utf8Bytes::from(response.to_string())))
774                        .await
775                        .unwrap();
776                }
777            }
778
779            // Handle ping
780            while let Some(Ok(msg)) = ws_stream.next().await {
781                if let Message::Text(text) = msg &&
782                    text.contains("node-ping")
783                {
784                    let pong = json!({
785                        "emit": ["node-pong", {"id": "test-node"}]
786                    });
787                    ws_stream.send(Message::Text(Utf8Bytes::from(pong.to_string()))).await.unwrap();
788                }
789            }
790        });
791
792        (addr.to_string(), handle)
793    }
794
795    #[tokio::test]
796    async fn test_connection_and_login() {
797        let (server_url, server_handle) = setup_mock_server().await;
798        let ethstats_url = format!("test-node:test-secret@{server_url}");
799
800        let network = NoopNetwork::default();
801        let provider = NoopProvider::default();
802        let pool = NoopTransactionPool::default();
803
804        let service = EthStatsService::new(&ethstats_url, network, provider, pool)
805            .await
806            .expect("Service should connect");
807
808        // Verify connection was established
809        assert!(service.is_connected().await, "Service should be connected");
810
811        // Clean up server
812        server_handle.abort();
813    }
814
815    #[tokio::test]
816    async fn test_history_command_handling() {
817        let (server_url, server_handle) = setup_mock_server().await;
818        let ethstats_url = format!("test-node:test-secret@{server_url}");
819
820        let network = NoopNetwork::default();
821        let provider = NoopProvider::default();
822        let pool = NoopTransactionPool::default();
823
824        let service = EthStatsService::new(&ethstats_url, network, provider, pool)
825            .await
826            .expect("Service should connect");
827
828        // Simulate receiving a history command
829        let history_cmd = json!({
830            "emit": ["history", {"list": [1, 2, 3]}]
831        });
832
833        service.handle_message(history_cmd).await.expect("History command should be handled");
834
835        // Clean up server
836        server_handle.abort();
837    }
838
839    #[tokio::test]
840    async fn test_invalid_url_handling() {
841        let network = NoopNetwork::default();
842        let provider = NoopProvider::default();
843        let pool = NoopTransactionPool::default();
844
845        // Test missing secret
846        let result = EthStatsService::new(
847            "test-node@localhost",
848            network.clone(),
849            provider.clone(),
850            pool.clone(),
851        )
852        .await;
853        assert!(
854            matches!(result, Err(EthStatsError::InvalidUrl(_))),
855            "Should detect invalid URL format"
856        );
857
858        // Test invalid URL format
859        let result = EthStatsService::new("invalid-url", network, provider, pool).await;
860        assert!(
861            matches!(result, Err(EthStatsError::InvalidUrl(_))),
862            "Should detect invalid URL format"
863        );
864    }
865}