reth_node_ethstats/
ethstats.rs

1use crate::{
2    connection::ConnWrapper,
3    credentials::EthstatsCredentials,
4    error::EthStatsError,
5    events::{
6        AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PendingMsg,
7        PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
8    },
9};
10use alloy_consensus::{BlockHeader, Sealable};
11use alloy_primitives::U256;
12use reth_chain_state::{CanonStateNotification, CanonStateSubscriptions};
13use reth_network_api::{NetworkInfo, Peers};
14use reth_primitives_traits::{Block, BlockBody};
15use reth_storage_api::{BlockReader, BlockReaderIdExt, NodePrimitivesProvider};
16use reth_transaction_pool::TransactionPool;
17
18use chrono::Local;
19use serde_json::Value;
20use std::{
21    str::FromStr,
22    sync::Arc,
23    time::{Duration, Instant},
24};
25use tokio::{
26    sync::{mpsc, Mutex, RwLock},
27    time::{interval, sleep, timeout},
28};
29use tokio_stream::StreamExt;
30use tokio_tungstenite::connect_async;
31use tracing::{debug, info};
32use url::Url;
33
34/// Number of historical blocks to include in a history update sent to the `EthStats` server
35const HISTORY_UPDATE_RANGE: u64 = 50;
36/// Duration to wait before attempting to reconnect to the `EthStats` server
37const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
38/// Maximum time to wait for a ping response from the server
39const PING_TIMEOUT: Duration = Duration::from_secs(5);
40/// Interval between regular stats reports to the server
41const REPORT_INTERVAL: Duration = Duration::from_secs(15);
42/// Maximum time to wait for initial connection establishment
43const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
44/// Maximum time to wait for reading messages from the server
45const READ_TIMEOUT: Duration = Duration::from_secs(30);
46
47/// Main service for interacting with an `EthStats` server
48///
49/// This service handles all communication with the `EthStats` server including
50/// authentication, stats reporting, block notifications, and connection management.
51/// It maintains a persistent `WebSocket` connection and automatically reconnects
52/// when the connection is lost.
53#[derive(Debug)]
54pub struct EthStatsService<Network, Provider, Pool> {
55    /// Authentication credentials for the `EthStats` server
56    credentials: EthstatsCredentials,
57    /// `WebSocket` connection wrapper, wrapped in `Arc<RwLock>` for shared access
58    conn: Arc<RwLock<Option<ConnWrapper>>>,
59    /// Timestamp of the last ping sent to the server
60    last_ping: Arc<Mutex<Option<Instant>>>,
61    /// Network interface for getting peer and sync information
62    network: Network,
63    /// Blockchain provider for reading block data and state
64    provider: Provider,
65    /// Transaction pool for getting pending transaction statistics
66    pool: Pool,
67}
68
69impl<Network, Provider, Pool> EthStatsService<Network, Provider, Pool>
70where
71    Network: NetworkInfo + Peers,
72    Provider: BlockReaderIdExt + CanonStateSubscriptions,
73    Pool: TransactionPool,
74{
75    /// Create a new `EthStats` service and establish initial connection
76    ///
77    /// # Arguments
78    /// * `url` - Connection string in format "`node_id:secret@host`"
79    /// * `network` - Network interface implementation
80    /// * `provider` - Blockchain provider implementation
81    /// * `pool` - Transaction pool implementation
82    pub async fn new(
83        url: &str,
84        network: Network,
85        provider: Provider,
86        pool: Pool,
87    ) -> Result<Self, EthStatsError> {
88        let credentials = EthstatsCredentials::from_str(url)?;
89        let service = Self {
90            credentials,
91            conn: Arc::new(RwLock::new(None)),
92            last_ping: Arc::new(Mutex::new(None)),
93            network,
94            provider,
95            pool,
96        };
97        service.connect().await?;
98
99        Ok(service)
100    }
101
102    /// Establish `WebSocket` connection to the `EthStats` server
103    ///
104    /// Attempts to connect to the server using the credentials and handles
105    /// connection timeouts and errors.
106    async fn connect(&self) -> Result<(), EthStatsError> {
107        debug!(
108            target: "ethstats",
109            "Attempting to connect to EthStats server at {}", self.credentials.host
110        );
111        let full_url = format!("ws://{}/api", self.credentials.host);
112        let url = Url::parse(&full_url)
113            .map_err(|e| EthStatsError::InvalidUrl(format!("Invalid URL: {full_url} - {e}")))?;
114
115        match timeout(CONNECT_TIMEOUT, connect_async(url.to_string())).await {
116            Ok(Ok((ws_stream, _))) => {
117                debug!(
118                    target: "ethstats",
119                    "Successfully connected to EthStats server at {}", self.credentials.host
120                );
121                let conn: ConnWrapper = ConnWrapper::new(ws_stream);
122                *self.conn.write().await = Some(conn.clone());
123                self.login().await?;
124                Ok(())
125            }
126            Ok(Err(e)) => Err(EthStatsError::InvalidUrl(e.to_string())),
127            Err(_) => {
128                debug!(target: "ethstats", "Connection to EthStats server timed out");
129                Err(EthStatsError::Timeout)
130            }
131        }
132    }
133
134    /// Authenticate with the `EthStats` server
135    ///
136    /// Sends authentication credentials and node information to the server
137    /// and waits for a successful acknowledgment.
138    async fn login(&self) -> Result<(), EthStatsError> {
139        debug!(
140            target: "ethstats",
141            "Attempting to login to EthStats server as node_id {}", self.credentials.node_id
142        );
143        let conn = self.conn.read().await;
144        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
145
146        let network_status = self
147            .network
148            .network_status()
149            .await
150            .map_err(|e| EthStatsError::AuthError(e.to_string()))?;
151        let id = &self.credentials.node_id;
152        let secret = &self.credentials.secret;
153        let protocol = network_status
154            .capabilities
155            .iter()
156            .map(|cap| format!("{}/{}", cap.name, cap.version))
157            .collect::<Vec<_>>()
158            .join(", ");
159        let port = self.network.local_addr().port() as u64;
160
161        let auth = AuthMsg {
162            id: id.clone(),
163            secret: secret.clone(),
164            info: NodeInfo {
165                name: id.clone(),
166                node: network_status.client_version.clone(),
167                port,
168                network: self.network.chain_id().to_string(),
169                protocol,
170                api: "No".to_string(),
171                os: std::env::consts::OS.into(),
172                os_ver: std::env::consts::ARCH.into(),
173                client: "0.1.1".to_string(),
174                history: true,
175            },
176        };
177
178        let message = auth.generate_login_message();
179        conn.write_json(&message).await?;
180
181        let response =
182            timeout(READ_TIMEOUT, conn.read_json()).await.map_err(|_| EthStatsError::Timeout)??;
183
184        if let Some(ack) = response.get("emit") {
185            if ack.get(0) == Some(&Value::String("ready".to_string())) {
186                info!(
187                    target: "ethstats",
188                    "Login successful to EthStats server as node_id {}", self.credentials.node_id
189                );
190                return Ok(());
191            }
192        }
193
194        debug!(target: "ethstats", "Login failed: Unauthorized or unexpected login response");
195        Err(EthStatsError::AuthError("Unauthorized or unexpected login response".into()))
196    }
197
198    /// Report current node statistics to the `EthStats` server
199    ///
200    /// Sends information about the node's current state including sync status,
201    /// peer count, and uptime.
202    async fn report_stats(&self) -> Result<(), EthStatsError> {
203        let conn = self.conn.read().await;
204        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
205
206        let stats_msg = StatsMsg {
207            id: self.credentials.node_id.clone(),
208            stats: NodeStats {
209                active: true,
210                syncing: self.network.is_syncing(),
211                peers: self.network.num_connected_peers() as u64,
212                gas_price: 0, // TODO
213                uptime: 100,
214            },
215        };
216
217        let message = stats_msg.generate_stats_message();
218        conn.write_json(&message).await?;
219
220        Ok(())
221    }
222
223    /// Send a ping message to the `EthStats` server
224    ///
225    /// Records the ping time and starts a timeout task to detect if the server
226    /// doesn't respond within the expected timeframe.
227    async fn send_ping(&self) -> Result<(), EthStatsError> {
228        let conn = self.conn.read().await;
229        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
230
231        let ping_time = Instant::now();
232        *self.last_ping.lock().await = Some(ping_time);
233
234        let client_time = Local::now().format("%Y-%m-%d %H:%M:%S%.f %:z %Z").to_string();
235        let ping_msg = PingMsg { id: self.credentials.node_id.clone(), client_time };
236
237        let message = ping_msg.generate_ping_message();
238        conn.write_json(&message).await?;
239
240        // Start ping timeout
241        let active_ping = self.last_ping.clone();
242        let conn_ref = self.conn.clone();
243        tokio::spawn(async move {
244            sleep(PING_TIMEOUT).await;
245            let mut active = active_ping.lock().await;
246            if active.is_some() {
247                debug!(target: "ethstats", "Ping timeout");
248                *active = None;
249                // Clear connection to trigger reconnect
250                if let Some(conn) = conn_ref.write().await.take() {
251                    let _ = conn.close().await;
252                }
253            }
254        });
255
256        Ok(())
257    }
258
259    /// Report latency measurement to the `EthStats` server
260    ///
261    /// Calculates the round-trip time from the last ping and sends it to
262    /// the server. This is called when a pong response is received.
263    async fn report_latency(&self) -> Result<(), EthStatsError> {
264        let conn = self.conn.read().await;
265        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
266
267        let mut active = self.last_ping.lock().await;
268        if let Some(start) = active.take() {
269            let latency = start.elapsed().as_millis() as u64 / 2;
270
271            debug!(target: "ethstats", "Reporting latency: {}ms", latency);
272
273            let latency_msg = LatencyMsg { id: self.credentials.node_id.clone(), latency };
274
275            let message = latency_msg.generate_latency_message();
276            conn.write_json(&message).await?
277        }
278
279        Ok(())
280    }
281
282    /// Report pending transaction count to the `EthStats` server
283    ///
284    /// Gets the current number of pending transactions from the pool and
285    /// sends this information to the server.
286    async fn report_pending(&self) -> Result<(), EthStatsError> {
287        let conn = self.conn.read().await;
288        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
289        let pending = self.pool.pool_size().pending as u64;
290
291        debug!(target: "ethstats", "Reporting pending txs: {}", pending);
292
293        let pending_msg =
294            PendingMsg { id: self.credentials.node_id.clone(), stats: PendingStats { pending } };
295
296        let message = pending_msg.generate_pending_message();
297        conn.write_json(&message).await?;
298
299        Ok(())
300    }
301
302    /// Report block information to the `EthStats` server
303    ///
304    /// Fetches block data either from a canonical state notification or
305    /// the current best block, converts it to stats format, and sends
306    /// it to the server.
307    ///
308    /// # Arguments
309    /// * `head` - Optional canonical state notification containing new block info
310    async fn report_block(
311        &self,
312        head: Option<CanonStateNotification<<Provider as NodePrimitivesProvider>::Primitives>>,
313    ) -> Result<(), EthStatsError> {
314        let conn = self.conn.read().await;
315        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
316
317        let block_number = if let Some(head) = head {
318            head.tip().header().number()
319        } else {
320            self.provider
321                .best_block_number()
322                .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?
323        };
324
325        match self.provider.block_by_id(block_number.into()) {
326            Ok(Some(block)) => {
327                let block_msg = BlockMsg {
328                    id: self.credentials.node_id.clone(),
329                    block: self.block_to_stats(&block)?,
330                };
331
332                debug!(target: "ethstats", "Reporting block: {}", block_number);
333
334                let message = block_msg.generate_block_message();
335                conn.write_json(&message).await?;
336            }
337            Ok(None) => {
338                // Block not found, stop fetching
339                debug!(target: "ethstats", "Block {} not found", block_number);
340                return Err(EthStatsError::BlockNotFound(block_number));
341            }
342            Err(e) => {
343                debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
344                return Err(EthStatsError::DataFetchError(e.to_string()));
345            }
346        };
347
348        Ok(())
349    }
350
351    /// Convert a block to `EthStats` block statistics format
352    ///
353    /// Extracts relevant information from a block and formats it according
354    /// to the `EthStats` protocol specification.
355    ///
356    /// # Arguments
357    /// * `block` - The block to convert
358    fn block_to_stats(
359        &self,
360        block: &<Provider as BlockReader>::Block,
361    ) -> Result<BlockStats, EthStatsError> {
362        let body = block.body();
363        let header = block.header();
364
365        let txs = body.transaction_hashes_iter().copied().map(|hash| TxStats { hash }).collect();
366
367        Ok(BlockStats {
368            number: U256::from(header.number()),
369            hash: header.hash_slow(),
370            parent_hash: header.parent_hash(),
371            timestamp: U256::from(header.timestamp()),
372            miner: header.beneficiary(),
373            gas_used: header.gas_used(),
374            gas_limit: header.gas_limit(),
375            diff: header.difficulty().to_string(),
376            total_diff: "0".into(),
377            txs,
378            tx_root: header.transactions_root(),
379            root: header.state_root(),
380            uncles: UncleStats(vec![]),
381        })
382    }
383
384    /// Report historical block data to the `EthStats` server
385    ///
386    /// Fetches multiple blocks by their numbers and sends their statistics
387    /// to the server. This is typically called in response to a history
388    /// request from the server.
389    ///
390    /// # Arguments
391    /// * `list` - Vector of block numbers to fetch and report
392    async fn report_history(&self, list: Option<&Vec<u64>>) -> Result<(), EthStatsError> {
393        let conn = self.conn.read().await;
394        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
395
396        let indexes = if let Some(list) = list {
397            list
398        } else {
399            let best_block_number = self
400                .provider
401                .best_block_number()
402                .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?;
403
404            let start = best_block_number.saturating_sub(HISTORY_UPDATE_RANGE);
405
406            &(start..=best_block_number).collect()
407        };
408
409        let mut blocks = Vec::with_capacity(indexes.len());
410        for &block_number in indexes {
411            match self.provider.block_by_id(block_number.into()) {
412                Ok(Some(block)) => {
413                    blocks.push(block);
414                }
415                Ok(None) => {
416                    // Block not found, stop fetching
417                    debug!(target: "ethstats", "Block {} not found", block_number);
418                    break;
419                }
420                Err(e) => {
421                    debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
422                    break;
423                }
424            }
425        }
426
427        let history: Vec<BlockStats> =
428            blocks.iter().map(|block| self.block_to_stats(block)).collect::<Result<_, _>>()?;
429
430        if history.is_empty() {
431            debug!(target: "ethstats", "No history to send to stats server");
432        } else {
433            debug!(
434                target: "ethstats",
435                "Sending historical blocks to ethstats, first: {}, last: {}",
436                history.first().unwrap().number,
437                history.last().unwrap().number
438            );
439        }
440
441        let history_msg = HistoryMsg { id: self.credentials.node_id.clone(), history };
442
443        let message = history_msg.generate_history_message();
444        conn.write_json(&message).await?;
445
446        Ok(())
447    }
448
449    /// Send a complete status report to the `EthStats` server
450    ///
451    /// Performs all regular reporting tasks: ping, block info, pending
452    /// transactions, and general statistics.
453    async fn report(&self) -> Result<(), EthStatsError> {
454        self.send_ping().await?;
455        self.report_block(None).await?;
456        self.report_pending().await?;
457        self.report_stats().await?;
458
459        Ok(())
460    }
461
462    /// Handle incoming messages from the `EthStats` server
463    ///
464    /// # Expected Message Variants
465    ///
466    /// This function expects messages in the following format:
467    ///
468    /// ```json
469    /// { "emit": [<command: String>, <payload: Object>] }
470    /// ```
471    ///
472    /// ## Supported Commands:
473    ///
474    /// - `"node-pong"`: Indicates a pong response to a previously sent ping. The payload is
475    ///   ignored. Triggers a latency report to the server.
476    ///   - Example: ```json { "emit": [ "node-pong", { "clientTime": "2025-07-10 12:00:00.123
477    ///     +00:00 UTC", "serverTime": "2025-07-10 12:00:01.456 +00:00 UTC" } ] } ```
478    ///
479    /// - `"history"`: Requests historical block data. The payload may contain a `list` field with
480    ///   block numbers to fetch. If `list` is not present, the default range is used.
481    ///   - Example with list: `{ "emit": ["history", {"list": [1, 2, 3], "min": 1, "max": 3}] }`
482    ///   - Example without list: `{ "emit": ["history", {}] }`
483    ///
484    /// ## Other Commands:
485    ///
486    /// Any other command is logged as unhandled and ignored.
487    async fn handle_message(&self, msg: Value) -> Result<(), EthStatsError> {
488        let emit = match msg.get("emit") {
489            Some(emit) => emit,
490            None => {
491                debug!(target: "ethstats", "Stats server sent non-broadcast, msg {}", msg);
492                return Err(EthStatsError::InvalidRequest);
493            }
494        };
495
496        let command = match emit.get(0) {
497            Some(Value::String(command)) => command.as_str(),
498            _ => {
499                debug!(target: "ethstats", "Invalid stats server message type, msg {}", msg);
500                return Err(EthStatsError::InvalidRequest);
501            }
502        };
503
504        match command {
505            "node-pong" => {
506                self.report_latency().await?;
507            }
508            "history" => {
509                let block_numbers = emit
510                    .get(1)
511                    .and_then(|v| v.as_object())
512                    .and_then(|obj| obj.get("list"))
513                    .and_then(|v| v.as_array());
514
515                if block_numbers.is_none() {
516                    self.report_history(None).await?;
517
518                    return Ok(());
519                }
520
521                let block_numbers = block_numbers
522                    .unwrap()
523                    .iter()
524                    .map(|val| {
525                        val.as_u64().ok_or_else(|| {
526                            debug!(
527                                target: "ethstats",
528                                "Invalid stats history block number, msg {}", msg
529                            );
530                            EthStatsError::InvalidRequest
531                        })
532                    })
533                    .collect::<Result<_, _>>()?;
534
535                self.report_history(Some(&block_numbers)).await?;
536            }
537            other => debug!(target: "ethstats", "Unhandled command: {}", other),
538        }
539
540        Ok(())
541    }
542
543    /// Main service loop that handles all `EthStats` communication
544    ///
545    /// This method runs the main event loop that:
546    /// - Maintains the `WebSocket` connection
547    /// - Handles incoming messages from the server
548    /// - Reports statistics at regular intervals
549    /// - Processes new block notifications
550    /// - Automatically reconnects when the connection is lost
551    ///
552    /// The service runs until explicitly shut down or an unrecoverable
553    /// error occurs.
554    pub async fn run(self) {
555        // Create channels for internal communication
556        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
557        let (message_tx, mut message_rx) = mpsc::channel(32);
558        let (head_tx, mut head_rx) = mpsc::channel(10);
559
560        // Start the read loop in a separate task
561        let read_handle = {
562            let conn = self.conn.clone();
563            let message_tx = message_tx.clone();
564            let shutdown_tx = shutdown_tx.clone();
565
566            tokio::spawn(async move {
567                loop {
568                    let conn = conn.read().await;
569                    if let Some(conn) = conn.as_ref() {
570                        match conn.read_json().await {
571                            Ok(msg) => {
572                                if message_tx.send(msg).await.is_err() {
573                                    break;
574                                }
575                            }
576                            Err(e) => {
577                                debug!(target: "ethstats", "Read error: {}", e);
578                                break;
579                            }
580                        }
581                    } else {
582                        sleep(RECONNECT_INTERVAL).await;
583                    }
584                }
585
586                let _ = shutdown_tx.send(()).await;
587            })
588        };
589
590        let canonical_stream_handle = {
591            let mut canonical_stream = self.provider.canonical_state_stream();
592            let head_tx = head_tx.clone();
593            let shutdown_tx = shutdown_tx.clone();
594
595            tokio::spawn(async move {
596                loop {
597                    let head = canonical_stream.next().await;
598                    if let Some(head) = head {
599                        if head_tx.send(head).await.is_err() {
600                            break;
601                        }
602                    }
603                }
604
605                let _ = shutdown_tx.send(()).await;
606            })
607        };
608
609        let mut pending_tx_receiver = self.pool.pending_transactions_listener();
610
611        // Set up intervals
612        let mut report_interval = interval(REPORT_INTERVAL);
613        let mut reconnect_interval = interval(RECONNECT_INTERVAL);
614
615        // Main event loop using select!
616        loop {
617            tokio::select! {
618                // Handle shutdown signal
619                _ = shutdown_rx.recv() => {
620                    info!(target: "ethstats", "Shutting down ethstats service");
621                    break;
622                }
623
624                // Handle messages from the read loop
625                Some(msg) = message_rx.recv() => {
626                    if let Err(e) = self.handle_message(msg).await {
627                        debug!(target: "ethstats", "Error handling message: {}", e);
628                        self.disconnect().await;
629                    }
630                }
631
632                // Handle new block
633                Some(head) = head_rx.recv() => {
634                    if let Err(e) = self.report_block(Some(head)).await {
635                        debug!(target: "ethstats", "Failed to report block: {}", e);
636                        self.disconnect().await;
637                    }
638
639                    if let Err(e) = self.report_pending().await {
640                        debug!(target: "ethstats", "Failed to report pending: {}", e);
641                        self.disconnect().await;
642                    }
643                }
644
645                // Handle new pending tx
646                _= pending_tx_receiver.recv() => {
647                    if let Err(e) = self.report_pending().await {
648                        debug!(target: "ethstats", "Failed to report pending: {}", e);
649                        self.disconnect().await;
650                    }
651                }
652
653                // Handle stats reporting
654                _ = report_interval.tick() => {
655                    if let Err(e) = self.report().await {
656                        debug!(target: "ethstats", "Failed to report: {}", e);
657                        self.disconnect().await;
658                    }
659                }
660
661                // Handle reconnection
662                _ = reconnect_interval.tick(), if self.conn.read().await.is_none() => {
663                    match self.connect().await {
664                        Ok(_) => info!(target: "ethstats", "Reconnected successfully"),
665                        Err(e) => debug!(target: "ethstats", "Reconnect failed: {}", e),
666                    }
667                }
668            }
669        }
670
671        // Cleanup
672        self.disconnect().await;
673
674        // Cancel background tasks
675        read_handle.abort();
676        canonical_stream_handle.abort();
677    }
678
679    /// Gracefully close the `WebSocket` connection
680    ///
681    /// Attempts to close the connection cleanly and logs any errors
682    /// that occur during the process.
683    async fn disconnect(&self) {
684        if let Some(conn) = self.conn.write().await.take() {
685            if let Err(e) = conn.close().await {
686                debug!(target: "ethstats", "Error closing connection: {}", e);
687            }
688        }
689    }
690
691    /// Test helper to check connection status
692    #[cfg(test)]
693    pub async fn is_connected(&self) -> bool {
694        self.conn.read().await.is_some()
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use super::*;
701    use futures_util::{SinkExt, StreamExt};
702    use reth_network_api::noop::NoopNetwork;
703    use reth_storage_api::noop::NoopProvider;
704    use reth_transaction_pool::noop::NoopTransactionPool;
705    use serde_json::json;
706    use tokio::net::TcpListener;
707    use tokio_tungstenite::tungstenite::protocol::{frame::Utf8Bytes, Message};
708
709    const TEST_HOST: &str = "127.0.0.1";
710    const TEST_PORT: u16 = 0; // Let OS choose port
711
712    async fn setup_mock_server() -> (String, tokio::task::JoinHandle<()>) {
713        let listener = TcpListener::bind((TEST_HOST, TEST_PORT)).await.unwrap();
714        let addr = listener.local_addr().unwrap();
715
716        let handle = tokio::spawn(async move {
717            let (stream, _) = listener.accept().await.unwrap();
718            let mut ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
719
720            // Handle login
721            if let Some(Ok(Message::Text(text))) = ws_stream.next().await {
722                let value: serde_json::Value = serde_json::from_str(&text).unwrap();
723                if value["emit"][0] == "hello" {
724                    let response = json!({
725                        "emit": ["ready", []]
726                    });
727                    ws_stream
728                        .send(Message::Text(Utf8Bytes::from(response.to_string())))
729                        .await
730                        .unwrap();
731                }
732            }
733
734            // Handle ping
735            while let Some(Ok(msg)) = ws_stream.next().await {
736                if let Message::Text(text) = msg {
737                    if text.contains("node-ping") {
738                        let pong = json!({
739                            "emit": ["node-pong", {"id": "test-node"}]
740                        });
741                        ws_stream
742                            .send(Message::Text(Utf8Bytes::from(pong.to_string())))
743                            .await
744                            .unwrap();
745                    }
746                }
747            }
748        });
749
750        (addr.to_string(), handle)
751    }
752
753    #[tokio::test]
754    async fn test_connection_and_login() {
755        let (server_url, server_handle) = setup_mock_server().await;
756        let ethstats_url = format!("test-node:test-secret@{server_url}");
757
758        let network = NoopNetwork::default();
759        let provider = NoopProvider::default();
760        let pool = NoopTransactionPool::default();
761
762        let service = EthStatsService::new(&ethstats_url, network, provider, pool)
763            .await
764            .expect("Service should connect");
765
766        // Verify connection was established
767        assert!(service.is_connected().await, "Service should be connected");
768
769        // Clean up server
770        server_handle.abort();
771    }
772
773    #[tokio::test]
774    async fn test_history_command_handling() {
775        let (server_url, server_handle) = setup_mock_server().await;
776        let ethstats_url = format!("test-node:test-secret@{server_url}");
777
778        let network = NoopNetwork::default();
779        let provider = NoopProvider::default();
780        let pool = NoopTransactionPool::default();
781
782        let service = EthStatsService::new(&ethstats_url, network, provider, pool)
783            .await
784            .expect("Service should connect");
785
786        // Simulate receiving a history command
787        let history_cmd = json!({
788            "emit": ["history", {"list": [1, 2, 3]}]
789        });
790
791        service.handle_message(history_cmd).await.expect("History command should be handled");
792
793        // Clean up server
794        server_handle.abort();
795    }
796
797    #[tokio::test]
798    async fn test_invalid_url_handling() {
799        let network = NoopNetwork::default();
800        let provider = NoopProvider::default();
801        let pool = NoopTransactionPool::default();
802
803        // Test missing secret
804        let result = EthStatsService::new(
805            "test-node@localhost",
806            network.clone(),
807            provider.clone(),
808            pool.clone(),
809        )
810        .await;
811        assert!(
812            matches!(result, Err(EthStatsError::InvalidUrl(_))),
813            "Should detect invalid URL format"
814        );
815
816        // Test invalid URL format
817        let result = EthStatsService::new("invalid-url", network, provider, pool).await;
818        assert!(
819            matches!(result, Err(EthStatsError::InvalidUrl(_))),
820            "Should detect invalid URL format"
821        );
822    }
823}