Skip to main content

reth_node_ethstats/
ethstats.rs

1use crate::{
2    connection::ConnWrapper,
3    credentials::EthstatsCredentials,
4    error::EthStatsError,
5    events::{
6        AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PayloadMsg,
7        PayloadStats, PendingMsg, PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
8    },
9};
10use alloy_consensus::{BlockHeader, Sealable};
11use alloy_primitives::U256;
12use reth_chain_state::{CanonStateNotification, CanonStateSubscriptions};
13use reth_network_api::{NetworkInfo, Peers};
14use reth_primitives_traits::{Block, BlockBody};
15use reth_storage_api::{BlockReader, BlockReaderIdExt, NodePrimitivesProvider};
16use reth_transaction_pool::TransactionPool;
17
18use chrono::Local;
19use serde_json::Value;
20use std::{
21    str::FromStr,
22    sync::Arc,
23    time::{Duration, Instant},
24};
25use tokio::{
26    sync::{mpsc, Mutex, RwLock},
27    time::{interval, sleep, timeout},
28};
29use tokio_stream::StreamExt;
30use tokio_tungstenite::connect_async;
31use tracing::{debug, info};
32use url::Url;
33
34/// Number of historical blocks to include in a history update sent to the `EthStats` server
35const HISTORY_UPDATE_RANGE: u64 = 50;
36/// Duration to wait before attempting to reconnect to the `EthStats` server
37const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
38/// Maximum time to wait for a ping response from the server
39const PING_TIMEOUT: Duration = Duration::from_secs(5);
40/// Interval between regular stats reports to the server
41const REPORT_INTERVAL: Duration = Duration::from_secs(15);
42/// Maximum time to wait for initial connection establishment
43const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
44/// Maximum time to wait for reading messages from the server
45const READ_TIMEOUT: Duration = Duration::from_secs(30);
46
47/// Main service for interacting with an `EthStats` server
48///
49/// This service handles all communication with the `EthStats` server including
50/// authentication, stats reporting, block notifications, and connection management.
51/// It maintains a persistent `WebSocket` connection and automatically reconnects
52/// when the connection is lost.
53#[derive(Debug, Clone)]
54pub struct EthStatsService<Network, Provider, Pool> {
55    /// Authentication credentials for the `EthStats` server
56    credentials: EthstatsCredentials,
57    /// `WebSocket` connection wrapper, wrapped in `Arc<RwLock>` for shared access
58    conn: Arc<RwLock<Option<ConnWrapper>>>,
59    /// Timestamp of the last ping sent to the server
60    last_ping: Arc<Mutex<Option<Instant>>>,
61    /// Network interface for getting peer and sync information
62    network: Network,
63    /// Blockchain provider for reading block data and state
64    provider: Provider,
65    /// Transaction pool for getting pending transaction statistics
66    pool: Pool,
67}
68
69impl<Network, Provider, Pool> EthStatsService<Network, Provider, Pool>
70where
71    Network: NetworkInfo + Peers,
72    Provider: BlockReaderIdExt + CanonStateSubscriptions,
73    Pool: TransactionPool,
74{
75    /// Create a new `EthStats` service and establish initial connection
76    ///
77    /// # Arguments
78    /// * `url` - Connection string in format "`node_id:secret@host`"
79    /// * `network` - Network interface implementation
80    /// * `provider` - Blockchain provider implementation
81    /// * `pool` - Transaction pool implementation
82    pub async fn new(
83        url: &str,
84        network: Network,
85        provider: Provider,
86        pool: Pool,
87    ) -> Result<Self, EthStatsError> {
88        let credentials = EthstatsCredentials::from_str(url)?;
89        let service = Self {
90            credentials,
91            conn: Arc::new(RwLock::new(None)),
92            last_ping: Arc::new(Mutex::new(None)),
93            network,
94            provider,
95            pool,
96        };
97        service.connect().await?;
98
99        Ok(service)
100    }
101
102    /// Establish `WebSocket` connection to the `EthStats` server
103    ///
104    /// Attempts to connect to the server using the credentials and handles
105    /// connection timeouts and errors. Uses either `ws://` or `wss://` based
106    /// on the credentials configuration.
107    async fn connect(&self) -> Result<(), EthStatsError> {
108        debug!(
109            target: "ethstats",
110            "Attempting to connect to EthStats server at {}", self.credentials.host
111        );
112        let protocol = if self.credentials.use_tls { "wss" } else { "ws" };
113        let full_url = format!("{}://{}/api", protocol, self.credentials.host);
114        let url = Url::parse(&full_url).map_err(EthStatsError::Url)?;
115
116        match timeout(CONNECT_TIMEOUT, connect_async(url.as_str())).await {
117            Ok(Ok((ws_stream, _))) => {
118                debug!(
119                    target: "ethstats",
120                    "Successfully connected to EthStats server at {}", self.credentials.host
121                );
122                let conn: ConnWrapper = ConnWrapper::new(ws_stream);
123                *self.conn.write().await = Some(conn);
124                self.login().await?;
125                Ok(())
126            }
127            Ok(Err(e)) => Err(EthStatsError::WebSocket(e)),
128            Err(_) => {
129                debug!(target: "ethstats", "Connection to EthStats server timed out");
130                Err(EthStatsError::Timeout)
131            }
132        }
133    }
134
135    /// Authenticate with the `EthStats` server
136    ///
137    /// Sends authentication credentials and node information to the server
138    /// and waits for a successful acknowledgment.
139    async fn login(&self) -> Result<(), EthStatsError> {
140        debug!(
141            target: "ethstats",
142            "Attempting to login to EthStats server as node_id {}", self.credentials.node_id
143        );
144        let conn = self.conn.read().await;
145        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
146
147        let network_status = self
148            .network
149            .network_status()
150            .await
151            .map_err(|e| EthStatsError::AuthError(e.to_string()))?;
152        let id = &self.credentials.node_id;
153        let secret = &self.credentials.secret;
154        let protocol = network_status
155            .capabilities
156            .iter()
157            .map(|cap| format!("{}/{}", cap.name, cap.version))
158            .collect::<Vec<_>>()
159            .join(", ");
160        let port = self.network.local_addr().port() as u64;
161
162        let auth = AuthMsg {
163            id: id.clone(),
164            secret: secret.clone(),
165            info: NodeInfo {
166                name: id.clone(),
167                node: network_status.client_version.clone(),
168                port,
169                network: self.network.chain_id().to_string(),
170                protocol,
171                api: "No".to_string(),
172                os: std::env::consts::OS.into(),
173                os_ver: std::env::consts::ARCH.into(),
174                client: "0.1.1".to_string(),
175                history: true,
176            },
177        };
178
179        let message = auth.generate_login_message();
180        conn.write_json(&message).await?;
181
182        let response =
183            timeout(READ_TIMEOUT, conn.read_json()).await.map_err(|_| EthStatsError::Timeout)??;
184
185        if let Some(ack) = response.get("emit") &&
186            ack.get(0) == Some(&Value::String("ready".to_string()))
187        {
188            info!(
189                target: "ethstats",
190                "Login successful to EthStats server as node_id {}", self.credentials.node_id
191            );
192            return Ok(());
193        }
194
195        debug!(target: "ethstats", "Login failed: Unauthorized or unexpected login response");
196        Err(EthStatsError::AuthError("Unauthorized or unexpected login response".into()))
197    }
198
199    /// Report current node statistics to the `EthStats` server
200    ///
201    /// Sends information about the node's current state including sync status,
202    /// peer count, and uptime.
203    async fn report_stats(&self) -> Result<(), EthStatsError> {
204        let conn = self.conn.read().await;
205        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
206
207        let stats_msg = StatsMsg {
208            id: self.credentials.node_id.clone(),
209            stats: NodeStats {
210                active: true,
211                syncing: self.network.is_syncing(),
212                peers: self.network.num_connected_peers() as u64,
213                gas_price: self.pool.block_info().pending_basefee,
214                uptime: 100,
215            },
216        };
217
218        let message = stats_msg.generate_stats_message();
219        conn.write_json(&message).await?;
220
221        Ok(())
222    }
223
224    /// Send a ping message to the `EthStats` server
225    ///
226    /// Records the ping time and starts a timeout task to detect if the server
227    /// doesn't respond within the expected timeframe.
228    async fn send_ping(&self) -> Result<(), EthStatsError> {
229        let conn = self.conn.read().await;
230        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
231
232        let ping_time = Instant::now();
233        *self.last_ping.lock().await = Some(ping_time);
234
235        let client_time = Local::now().format("%Y-%m-%d %H:%M:%S%.f %:z %Z").to_string();
236        let ping_msg = PingMsg { id: self.credentials.node_id.clone(), client_time };
237
238        let message = ping_msg.generate_ping_message();
239        conn.write_json(&message).await?;
240
241        // Start ping timeout
242        let active_ping = self.last_ping.clone();
243        let conn_ref = self.conn.clone();
244        tokio::spawn(async move {
245            sleep(PING_TIMEOUT).await;
246            let timed_out = {
247                let mut active = active_ping.lock().await;
248                let timed_out = active.is_some();
249                if timed_out {
250                    *active = None;
251                }
252                timed_out
253            };
254
255            if timed_out {
256                debug!(target: "ethstats", "Ping timeout");
257                // Clear connection to trigger reconnect
258                if let Some(conn) = conn_ref.write().await.take() {
259                    let _ = conn.close().await;
260                }
261            }
262        });
263
264        Ok(())
265    }
266
267    /// Report latency measurement to the `EthStats` server
268    ///
269    /// Calculates the round-trip time from the last ping and sends it to
270    /// the server. This is called when a pong response is received.
271    async fn report_latency(&self) -> Result<(), EthStatsError> {
272        let start = {
273            let mut active = self.last_ping.lock().await;
274            active.take()
275        };
276
277        if let Some(start) = start {
278            let latency = start.elapsed().as_millis() as u64 / 2;
279
280            debug!(target: "ethstats", "Reporting latency: {}ms", latency);
281
282            let latency_msg = LatencyMsg { id: self.credentials.node_id.clone(), latency };
283
284            let message = latency_msg.generate_latency_message();
285            let conn = self.conn.read().await;
286            let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
287            conn.write_json(&message).await?;
288        }
289
290        Ok(())
291    }
292
293    /// Report pending transaction count to the `EthStats` server
294    ///
295    /// Gets the current number of pending transactions from the pool and
296    /// sends this information to the server.
297    async fn report_pending(&self) -> Result<(), EthStatsError> {
298        let conn = self.conn.read().await;
299        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
300        let pending = self.pool.pool_size().pending as u64;
301
302        debug!(target: "ethstats", "Reporting pending txs: {}", pending);
303
304        let pending_msg =
305            PendingMsg { id: self.credentials.node_id.clone(), stats: PendingStats { pending } };
306
307        let message = pending_msg.generate_pending_message();
308        conn.write_json(&message).await?;
309
310        Ok(())
311    }
312
313    /// Report block information to the `EthStats` server
314    ///
315    /// Fetches block data either from a canonical state notification or
316    /// the current best block, converts it to stats format, and sends
317    /// it to the server.
318    ///
319    /// # Arguments
320    /// * `head` - Optional canonical state notification containing new block info
321    async fn report_block(
322        &self,
323        head: Option<CanonStateNotification<<Provider as NodePrimitivesProvider>::Primitives>>,
324    ) -> Result<(), EthStatsError> {
325        let conn = self.conn.read().await;
326        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
327
328        let block_number = if let Some(head) = head {
329            head.tip().header().number()
330        } else {
331            self.provider
332                .best_block_number()
333                .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?
334        };
335
336        match self.provider.block_by_id(block_number.into()) {
337            Ok(Some(block)) => {
338                let block_msg = BlockMsg {
339                    id: self.credentials.node_id.clone(),
340                    block: self.block_to_stats(&block)?,
341                };
342
343                debug!(target: "ethstats", "Reporting block: {}", block_number);
344
345                let message = block_msg.generate_block_message();
346                conn.write_json(&message).await?;
347            }
348            Ok(None) => {
349                // Block not found, stop fetching
350                debug!(target: "ethstats", "Block {} not found", block_number);
351                return Err(EthStatsError::BlockNotFound(block_number));
352            }
353            Err(e) => {
354                debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
355                return Err(EthStatsError::DataFetchError(e.to_string()));
356            }
357        };
358
359        Ok(())
360    }
361
362    /// Report new payload information to the `EthStats` server
363    ///
364    /// Sends information about payload processing time and block details
365    /// to the server for monitoring purposes.
366    pub async fn report_new_payload(
367        &self,
368        block_hash: alloy_primitives::B256,
369        block_number: u64,
370        processing_time: Duration,
371    ) -> Result<(), EthStatsError> {
372        let conn = self.conn.read().await;
373        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
374
375        let payload_stats = PayloadStats {
376            number: U256::from(block_number),
377            hash: block_hash,
378            processing_time: processing_time.as_millis() as u64,
379        };
380
381        let payload_msg =
382            PayloadMsg { id: self.credentials.node_id.clone(), payload: payload_stats };
383
384        debug!(
385            target: "ethstats",
386            "Reporting new payload: block={}, hash={:?}, processing_time={}ms",
387            block_number,
388            block_hash,
389            processing_time.as_millis()
390        );
391
392        let message = payload_msg.generate_new_payload_message();
393        conn.write_json(&message).await?;
394
395        Ok(())
396    }
397
398    /// Convert a block to `EthStats` block statistics format
399    ///
400    /// Extracts relevant information from a block and formats it according
401    /// to the `EthStats` protocol specification.
402    ///
403    /// # Arguments
404    /// * `block` - The block to convert
405    fn block_to_stats(
406        &self,
407        block: &<Provider as BlockReader>::Block,
408    ) -> Result<BlockStats, EthStatsError> {
409        let body = block.body();
410        let header = block.header();
411
412        let txs = body.transaction_hashes_iter().copied().map(|hash| TxStats { hash }).collect();
413
414        Ok(BlockStats {
415            number: U256::from(header.number()),
416            hash: header.hash_slow(),
417            parent_hash: header.parent_hash(),
418            timestamp: U256::from(header.timestamp()),
419            miner: header.beneficiary(),
420            gas_used: header.gas_used(),
421            gas_limit: header.gas_limit(),
422            diff: header.difficulty().to_string(),
423            total_diff: "0".into(),
424            txs,
425            tx_root: header.transactions_root(),
426            root: header.state_root(),
427            uncles: UncleStats(vec![]),
428        })
429    }
430
431    /// Report historical block data to the `EthStats` server
432    ///
433    /// Fetches multiple blocks by their numbers and sends their statistics
434    /// to the server. This is typically called in response to a history
435    /// request from the server.
436    ///
437    /// # Arguments
438    /// * `list` - Vector of block numbers to fetch and report
439    async fn report_history(&self, list: Option<&Vec<u64>>) -> Result<(), EthStatsError> {
440        let conn = self.conn.read().await;
441        let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
442
443        let indexes = if let Some(list) = list {
444            list
445        } else {
446            let best_block_number = self
447                .provider
448                .best_block_number()
449                .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?;
450
451            let start = best_block_number.saturating_sub(HISTORY_UPDATE_RANGE);
452
453            &(start..=best_block_number).collect()
454        };
455
456        let mut blocks = Vec::with_capacity(indexes.len());
457        for &block_number in indexes {
458            match self.provider.block_by_id(block_number.into()) {
459                Ok(Some(block)) => {
460                    blocks.push(block);
461                }
462                Ok(None) => {
463                    // Block not found, stop fetching
464                    debug!(target: "ethstats", "Block {} not found", block_number);
465                    break;
466                }
467                Err(e) => {
468                    debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
469                    break;
470                }
471            }
472        }
473
474        let history: Vec<BlockStats> =
475            blocks.iter().map(|block| self.block_to_stats(block)).collect::<Result<_, _>>()?;
476
477        if history.is_empty() {
478            debug!(target: "ethstats", "No history to send to stats server");
479        } else {
480            debug!(
481                target: "ethstats",
482                "Sending historical blocks to ethstats, first: {}, last: {}",
483                history.first().unwrap().number,
484                history.last().unwrap().number
485            );
486        }
487
488        let history_msg = HistoryMsg { id: self.credentials.node_id.clone(), history };
489
490        let message = history_msg.generate_history_message();
491        conn.write_json(&message).await?;
492
493        Ok(())
494    }
495
496    /// Send a complete status report to the `EthStats` server
497    ///
498    /// Performs all regular reporting tasks: ping, block info, pending
499    /// transactions, and general statistics.
500    async fn report(&self) -> Result<(), EthStatsError> {
501        self.send_ping().await?;
502        self.report_block(None).await?;
503        self.report_pending().await?;
504        self.report_stats().await?;
505
506        Ok(())
507    }
508
509    /// Handle incoming messages from the `EthStats` server
510    ///
511    /// # Expected Message Variants
512    ///
513    /// This function expects messages in the following format:
514    ///
515    /// ```json
516    /// { "emit": [<command: String>, <payload: Object>] }
517    /// ```
518    ///
519    /// ## Supported Commands:
520    ///
521    /// - `"node-pong"`: Indicates a pong response to a previously sent ping. The payload is
522    ///   ignored. Triggers a latency report to the server.
523    ///   - Example: ```json { "emit": [ "node-pong", { "clientTime": "2025-07-10 12:00:00.123
524    ///     +00:00 UTC", "serverTime": "2025-07-10 12:00:01.456 +00:00 UTC" } ] } ```
525    ///
526    /// - `"history"`: Requests historical block data. The payload may contain a `list` field with
527    ///   block numbers to fetch. If `list` is not present, the default range is used.
528    ///   - Example with list: `{ "emit": ["history", {"list": [1, 2, 3], "min": 1, "max": 3}] }`
529    ///   - Example without list: `{ "emit": ["history", {}] }`
530    ///
531    /// ## Other Commands:
532    ///
533    /// Any other command is logged as unhandled and ignored.
534    async fn handle_message(&self, msg: Value) -> Result<(), EthStatsError> {
535        let emit = match msg.get("emit") {
536            Some(emit) => emit,
537            None => {
538                debug!(target: "ethstats", "Stats server sent non-broadcast, msg {}", msg);
539                return Err(EthStatsError::InvalidRequest);
540            }
541        };
542
543        let command = match emit.get(0) {
544            Some(Value::String(command)) => command.as_str(),
545            _ => {
546                debug!(target: "ethstats", "Invalid stats server message type, msg {}", msg);
547                return Err(EthStatsError::InvalidRequest);
548            }
549        };
550
551        match command {
552            "node-pong" => {
553                self.report_latency().await?;
554            }
555            "history" => {
556                let block_numbers = emit
557                    .get(1)
558                    .and_then(|v| v.as_object())
559                    .and_then(|obj| obj.get("list"))
560                    .and_then(|v| v.as_array());
561
562                if block_numbers.is_none() {
563                    self.report_history(None).await?;
564
565                    return Ok(());
566                }
567
568                let block_numbers = block_numbers
569                    .unwrap()
570                    .iter()
571                    .map(|val| {
572                        val.as_u64().ok_or_else(|| {
573                            debug!(
574                                target: "ethstats",
575                                "Invalid stats history block number, msg {}", msg
576                            );
577                            EthStatsError::InvalidRequest
578                        })
579                    })
580                    .collect::<Result<_, _>>()?;
581
582                self.report_history(Some(&block_numbers)).await?;
583            }
584            other => debug!(target: "ethstats", "Unhandled command: {}", other),
585        }
586
587        Ok(())
588    }
589
590    /// Main service loop that handles all `EthStats` communication
591    ///
592    /// This method runs the main event loop that:
593    /// - Maintains the `WebSocket` connection
594    /// - Handles incoming messages from the server
595    /// - Reports statistics at regular intervals
596    /// - Processes new block notifications
597    /// - Automatically reconnects when the connection is lost
598    ///
599    /// The service runs until explicitly shut down or an unrecoverable
600    /// error occurs.
601    pub async fn run(self) {
602        // Create channels for internal communication
603        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
604        let (message_tx, mut message_rx) = mpsc::channel(32);
605        let (head_tx, mut head_rx) = mpsc::channel(10);
606
607        // Start the read loop in a separate task
608        let read_handle = {
609            let conn_arc = self.conn.clone();
610            let message_tx = message_tx.clone();
611            let shutdown_tx = shutdown_tx.clone();
612
613            tokio::spawn(async move {
614                loop {
615                    let conn_guard = conn_arc.read().await;
616                    if let Some(conn) = conn_guard.as_ref() {
617                        match conn.read_json().await {
618                            Ok(msg) => {
619                                if message_tx.send(msg).await.is_err() {
620                                    break;
621                                }
622                            }
623                            Err(e) => match e {
624                                crate::error::ConnectionError::Serialization(err) => {
625                                    debug!(target: "ethstats", "JSON parse error from stats server: {}", err);
626                                }
627                                other => {
628                                    debug!(target: "ethstats", "Read error: {}", other);
629                                    drop(conn_guard);
630                                    if let Some(conn) = conn_arc.write().await.take() {
631                                        let _ = conn.close().await;
632                                    }
633                                }
634                            },
635                        }
636                    } else {
637                        sleep(RECONNECT_INTERVAL).await;
638                    }
639                }
640
641                let _ = shutdown_tx.send(()).await;
642            })
643        };
644
645        let canonical_stream_handle = {
646            let mut canonical_stream = self.provider.canonical_state_stream();
647            let head_tx = head_tx.clone();
648            let shutdown_tx = shutdown_tx.clone();
649
650            tokio::spawn(async move {
651                loop {
652                    let head = canonical_stream.next().await;
653                    if let Some(head) = head &&
654                        head_tx.send(head).await.is_err()
655                    {
656                        break;
657                    }
658                }
659
660                let _ = shutdown_tx.send(()).await;
661            })
662        };
663
664        let mut pending_tx_receiver = self.pool.pending_transactions_listener();
665
666        // Set up intervals
667        let mut report_interval = interval(REPORT_INTERVAL);
668        let mut reconnect_interval = interval(RECONNECT_INTERVAL);
669
670        // Main event loop using select!
671        loop {
672            tokio::select! {
673                // Handle shutdown signal
674                _ = shutdown_rx.recv() => {
675                    info!(target: "ethstats", "Shutting down ethstats service");
676                    break;
677                }
678
679                // Handle messages from the read loop
680                Some(msg) = message_rx.recv() => {
681                    if let Err(e) = self.handle_message(msg).await {
682                        debug!(target: "ethstats", "Error handling message: {}", e);
683                        self.disconnect().await;
684                    }
685                }
686
687                // Handle new block
688                Some(head) = head_rx.recv() => {
689                    if let Err(e) = self.report_block(Some(head)).await {
690                        debug!(target: "ethstats", "Failed to report block: {}", e);
691                        self.disconnect().await;
692                    }
693
694                    if let Err(e) = self.report_pending().await {
695                        debug!(target: "ethstats", "Failed to report pending: {}", e);
696                        self.disconnect().await;
697                    }
698                }
699
700                // Handle new pending tx
701                _= pending_tx_receiver.recv() => {
702                    if let Err(e) = self.report_pending().await {
703                        debug!(target: "ethstats", "Failed to report pending: {}", e);
704                        self.disconnect().await;
705                    }
706                }
707
708                // Handle stats reporting
709                _ = report_interval.tick() => {
710                    if let Err(e) = self.report().await {
711                        debug!(target: "ethstats", "Failed to report: {}", e);
712                        self.disconnect().await;
713                    }
714                }
715
716                // Handle reconnection
717                _ = reconnect_interval.tick() => {
718                    if self.conn.read().await.is_none() {
719                        match self.connect().await {
720                            Ok(_) => info!(target: "ethstats", "Reconnected successfully"),
721                            Err(e) => debug!(target: "ethstats", "Reconnect failed: {}", e),
722                        }
723                    }
724                }
725            }
726        }
727
728        // Cleanup
729        self.disconnect().await;
730
731        // Cancel background tasks
732        read_handle.abort();
733        canonical_stream_handle.abort();
734    }
735
736    /// Gracefully close the `WebSocket` connection
737    ///
738    /// Attempts to close the connection cleanly and logs any errors
739    /// that occur during the process.
740    async fn disconnect(&self) {
741        if let Some(conn) = self.conn.write().await.take() &&
742            let Err(e) = conn.close().await
743        {
744            debug!(target: "ethstats", "Error closing connection: {}", e);
745        }
746    }
747
748    /// Test helper to check connection status
749    #[cfg(test)]
750    pub async fn is_connected(&self) -> bool {
751        self.conn.read().await.is_some()
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use futures_util::{SinkExt, StreamExt};
759    use reth_network_api::noop::NoopNetwork;
760    use reth_storage_api::noop::NoopProvider;
761    use reth_transaction_pool::noop::NoopTransactionPool;
762    use serde_json::json;
763    use tokio::{net::TcpListener, sync::Notify};
764    use tokio_tungstenite::tungstenite::protocol::{frame::Utf8Bytes, Message};
765
766    const TEST_HOST: &str = "127.0.0.1";
767    const TEST_PORT: u16 = 0; // Let OS choose port
768
769    async fn setup_mock_server() -> (String, tokio::task::JoinHandle<()>) {
770        let listener = TcpListener::bind((TEST_HOST, TEST_PORT)).await.unwrap();
771        let addr = listener.local_addr().unwrap();
772
773        let handle = tokio::spawn(async move {
774            let (stream, _) = listener.accept().await.unwrap();
775            let mut ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
776
777            // Handle login
778            if let Some(Ok(Message::Text(text))) = ws_stream.next().await {
779                let value: serde_json::Value = serde_json::from_str(&text).unwrap();
780                if value["emit"][0] == "hello" {
781                    let response = json!({
782                        "emit": ["ready", []]
783                    });
784                    ws_stream
785                        .send(Message::Text(Utf8Bytes::from(response.to_string())))
786                        .await
787                        .unwrap();
788                }
789            }
790
791            // Handle ping
792            while let Some(Ok(msg)) = ws_stream.next().await {
793                if let Message::Text(text) = msg &&
794                    text.contains("node-ping")
795                {
796                    let pong = json!({
797                        "emit": ["node-pong", {"id": "test-node"}]
798                    });
799                    ws_stream.send(Message::Text(Utf8Bytes::from(pong.to_string()))).await.unwrap();
800                }
801            }
802        });
803
804        (addr.to_string(), handle)
805    }
806
807    #[tokio::test]
808    async fn test_connection_and_login() {
809        let (server_url, server_handle) = setup_mock_server().await;
810        let ethstats_url = format!("test-node:test-secret@{server_url}");
811
812        let network = NoopNetwork::default();
813        let provider = NoopProvider::default();
814        let pool = NoopTransactionPool::default();
815
816        let service = EthStatsService::new(&ethstats_url, network, provider, pool)
817            .await
818            .expect("Service should connect");
819
820        // Verify connection was established
821        assert!(service.is_connected().await, "Service should be connected");
822
823        // Clean up server
824        server_handle.abort();
825    }
826
827    #[tokio::test]
828    async fn test_history_command_handling() {
829        let (server_url, server_handle) = setup_mock_server().await;
830        let ethstats_url = format!("test-node:test-secret@{server_url}");
831
832        let network = NoopNetwork::default();
833        let provider = NoopProvider::default();
834        let pool = NoopTransactionPool::default();
835
836        let service = EthStatsService::new(&ethstats_url, network, provider, pool)
837            .await
838            .expect("Service should connect");
839
840        // Simulate receiving a history command
841        let history_cmd = json!({
842            "emit": ["history", {"list": [1, 2, 3]}]
843        });
844
845        service.handle_message(history_cmd).await.expect("History command should be handled");
846
847        // Clean up server
848        server_handle.abort();
849    }
850
851    #[tokio::test]
852    async fn test_invalid_url_handling() {
853        let network = NoopNetwork::default();
854        let provider = NoopProvider::default();
855        let pool = NoopTransactionPool::default();
856
857        // Test missing secret
858        let result = EthStatsService::new(
859            "test-node@localhost",
860            network.clone(),
861            provider.clone(),
862            pool.clone(),
863        )
864        .await;
865        assert!(
866            matches!(result, Err(EthStatsError::InvalidUrl(_))),
867            "Should detect invalid URL format"
868        );
869
870        // Test invalid URL format
871        let result = EthStatsService::new("invalid-url", network, provider, pool).await;
872        assert!(
873            matches!(result, Err(EthStatsError::InvalidUrl(_))),
874            "Should detect invalid URL format"
875        );
876    }
877
878    #[tokio::test(flavor = "current_thread")]
879    async fn report_latency_lock_order_regression() {
880        // Simulate a live connection so a pong handler (report_latency) can grab conn.read().
881        let (server_url, server_handle) = setup_mock_server().await;
882        let ethstats_url = format!("test-node:test-secret@{server_url}");
883
884        let network = NoopNetwork::default();
885        let provider = NoopProvider::default();
886        let pool = NoopTransactionPool::default();
887
888        let service = EthStatsService::new(&ethstats_url, network, provider, pool)
889            .await
890            .expect("Service should connect");
891
892        // Keep last_ping set to mimic an outstanding ping while a pong is being handled.
893        let mut last_ping_guard = service.last_ping.lock().await;
894        *last_ping_guard = Some(Instant::now());
895
896        let started = Arc::new(Notify::new());
897        let started_clone = started.clone();
898        let service_clone = service.clone();
899        let handle = tokio::spawn(async move {
900            started_clone.notify_one();
901            let _ = service_clone.report_latency().await;
902        });
903
904        // Let the pong handler start and (in the unfixed version) hold conn.read().
905        started.notified().await;
906        tokio::task::yield_now().await;
907
908        // This represents the timeout task trying to take conn.write() to close after a timeout.
909        // In the unfixed lock order, it would block because report_latency holds conn.read().
910        let write_guard =
911            tokio::time::timeout(std::time::Duration::from_millis(100), service.conn.write())
912                .await
913                .expect(
914                    "conn write lock should not be held while report_latency waits on last_ping",
915                );
916
917        drop(write_guard);
918        drop(last_ping_guard);
919
920        let _ = handle.await;
921        server_handle.abort();
922    }
923}