1use crate::{
2 connection::ConnWrapper,
3 credentials::EthstatsCredentials,
4 error::EthStatsError,
5 events::{
6 AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PayloadMsg,
7 PayloadStats, PendingMsg, PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
8 },
9};
10use alloy_consensus::{BlockHeader, Sealable};
11use alloy_primitives::U256;
12use reth_chain_state::{CanonStateNotification, CanonStateSubscriptions};
13use reth_network_api::{NetworkInfo, Peers};
14use reth_primitives_traits::{Block, BlockBody};
15use reth_storage_api::{BlockReader, BlockReaderIdExt, NodePrimitivesProvider};
16use reth_transaction_pool::TransactionPool;
17
18use chrono::Local;
19use serde_json::Value;
20use std::{
21 str::FromStr,
22 sync::Arc,
23 time::{Duration, Instant},
24};
25use tokio::{
26 sync::{mpsc, Mutex, RwLock},
27 time::{interval, sleep, timeout},
28};
29use tokio_stream::StreamExt;
30use tokio_tungstenite::connect_async;
31use tracing::{debug, info};
32use url::Url;
33
34const HISTORY_UPDATE_RANGE: u64 = 50;
36const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
38const PING_TIMEOUT: Duration = Duration::from_secs(5);
40const REPORT_INTERVAL: Duration = Duration::from_secs(15);
42const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
44const READ_TIMEOUT: Duration = Duration::from_secs(30);
46
47#[derive(Debug, Clone)]
54pub struct EthStatsService<Network, Provider, Pool> {
55 credentials: EthstatsCredentials,
57 conn: Arc<RwLock<Option<ConnWrapper>>>,
59 last_ping: Arc<Mutex<Option<Instant>>>,
61 network: Network,
63 provider: Provider,
65 pool: Pool,
67}
68
69impl<Network, Provider, Pool> EthStatsService<Network, Provider, Pool>
70where
71 Network: NetworkInfo + Peers,
72 Provider: BlockReaderIdExt + CanonStateSubscriptions,
73 Pool: TransactionPool,
74{
75 pub async fn new(
83 url: &str,
84 network: Network,
85 provider: Provider,
86 pool: Pool,
87 ) -> Result<Self, EthStatsError> {
88 let credentials = EthstatsCredentials::from_str(url)?;
89 let service = Self {
90 credentials,
91 conn: Arc::new(RwLock::new(None)),
92 last_ping: Arc::new(Mutex::new(None)),
93 network,
94 provider,
95 pool,
96 };
97 service.connect().await?;
98
99 Ok(service)
100 }
101
102 async fn connect(&self) -> Result<(), EthStatsError> {
108 debug!(
109 target: "ethstats",
110 "Attempting to connect to EthStats server at {}", self.credentials.host
111 );
112 let protocol = if self.credentials.use_tls { "wss" } else { "ws" };
113 let full_url = format!("{}://{}/api", protocol, self.credentials.host);
114 let url = Url::parse(&full_url).map_err(EthStatsError::Url)?;
115
116 match timeout(CONNECT_TIMEOUT, connect_async(url.as_str())).await {
117 Ok(Ok((ws_stream, _))) => {
118 debug!(
119 target: "ethstats",
120 "Successfully connected to EthStats server at {}", self.credentials.host
121 );
122 let conn: ConnWrapper = ConnWrapper::new(ws_stream);
123 *self.conn.write().await = Some(conn);
124 self.login().await?;
125 Ok(())
126 }
127 Ok(Err(e)) => Err(EthStatsError::WebSocket(e)),
128 Err(_) => {
129 debug!(target: "ethstats", "Connection to EthStats server timed out");
130 Err(EthStatsError::Timeout)
131 }
132 }
133 }
134
135 async fn login(&self) -> Result<(), EthStatsError> {
140 debug!(
141 target: "ethstats",
142 "Attempting to login to EthStats server as node_id {}", self.credentials.node_id
143 );
144 let conn = self.conn.read().await;
145 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
146
147 let network_status = self
148 .network
149 .network_status()
150 .await
151 .map_err(|e| EthStatsError::AuthError(e.to_string()))?;
152 let id = &self.credentials.node_id;
153 let secret = &self.credentials.secret;
154 let protocol = network_status
155 .capabilities
156 .iter()
157 .map(|cap| format!("{}/{}", cap.name, cap.version))
158 .collect::<Vec<_>>()
159 .join(", ");
160 let port = self.network.local_addr().port() as u64;
161
162 let auth = AuthMsg {
163 id: id.clone(),
164 secret: secret.clone(),
165 info: NodeInfo {
166 name: id.clone(),
167 node: network_status.client_version.clone(),
168 port,
169 network: self.network.chain_id().to_string(),
170 protocol,
171 api: "No".to_string(),
172 os: std::env::consts::OS.into(),
173 os_ver: std::env::consts::ARCH.into(),
174 client: "0.1.1".to_string(),
175 history: true,
176 },
177 };
178
179 let message = auth.generate_login_message();
180 conn.write_json(&message).await?;
181
182 let response =
183 timeout(READ_TIMEOUT, conn.read_json()).await.map_err(|_| EthStatsError::Timeout)??;
184
185 if let Some(ack) = response.get("emit") &&
186 ack.get(0) == Some(&Value::String("ready".to_string()))
187 {
188 info!(
189 target: "ethstats",
190 "Login successful to EthStats server as node_id {}", self.credentials.node_id
191 );
192 return Ok(());
193 }
194
195 debug!(target: "ethstats", "Login failed: Unauthorized or unexpected login response");
196 Err(EthStatsError::AuthError("Unauthorized or unexpected login response".into()))
197 }
198
199 async fn report_stats(&self) -> Result<(), EthStatsError> {
204 let conn = self.conn.read().await;
205 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
206
207 let stats_msg = StatsMsg {
208 id: self.credentials.node_id.clone(),
209 stats: NodeStats {
210 active: true,
211 syncing: self.network.is_syncing(),
212 peers: self.network.num_connected_peers() as u64,
213 gas_price: self.pool.block_info().pending_basefee,
214 uptime: 100,
215 },
216 };
217
218 let message = stats_msg.generate_stats_message();
219 conn.write_json(&message).await?;
220
221 Ok(())
222 }
223
224 async fn send_ping(&self) -> Result<(), EthStatsError> {
229 let conn = self.conn.read().await;
230 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
231
232 let ping_time = Instant::now();
233 *self.last_ping.lock().await = Some(ping_time);
234
235 let client_time = Local::now().format("%Y-%m-%d %H:%M:%S%.f %:z %Z").to_string();
236 let ping_msg = PingMsg { id: self.credentials.node_id.clone(), client_time };
237
238 let message = ping_msg.generate_ping_message();
239 conn.write_json(&message).await?;
240
241 let active_ping = self.last_ping.clone();
243 let conn_ref = self.conn.clone();
244 tokio::spawn(async move {
245 sleep(PING_TIMEOUT).await;
246 let timed_out = {
247 let mut active = active_ping.lock().await;
248 let timed_out = active.is_some();
249 if timed_out {
250 *active = None;
251 }
252 timed_out
253 };
254
255 if timed_out {
256 debug!(target: "ethstats", "Ping timeout");
257 if let Some(conn) = conn_ref.write().await.take() {
259 let _ = conn.close().await;
260 }
261 }
262 });
263
264 Ok(())
265 }
266
267 async fn report_latency(&self) -> Result<(), EthStatsError> {
272 let start = {
273 let mut active = self.last_ping.lock().await;
274 active.take()
275 };
276
277 if let Some(start) = start {
278 let latency = start.elapsed().as_millis() as u64 / 2;
279
280 debug!(target: "ethstats", "Reporting latency: {}ms", latency);
281
282 let latency_msg = LatencyMsg { id: self.credentials.node_id.clone(), latency };
283
284 let message = latency_msg.generate_latency_message();
285 let conn = self.conn.read().await;
286 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
287 conn.write_json(&message).await?;
288 }
289
290 Ok(())
291 }
292
293 async fn report_pending(&self) -> Result<(), EthStatsError> {
298 let conn = self.conn.read().await;
299 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
300 let pending = self.pool.pool_size().pending as u64;
301
302 debug!(target: "ethstats", "Reporting pending txs: {}", pending);
303
304 let pending_msg =
305 PendingMsg { id: self.credentials.node_id.clone(), stats: PendingStats { pending } };
306
307 let message = pending_msg.generate_pending_message();
308 conn.write_json(&message).await?;
309
310 Ok(())
311 }
312
313 async fn report_block(
322 &self,
323 head: Option<CanonStateNotification<<Provider as NodePrimitivesProvider>::Primitives>>,
324 ) -> Result<(), EthStatsError> {
325 let conn = self.conn.read().await;
326 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
327
328 let block_number = if let Some(head) = head {
329 head.tip().header().number()
330 } else {
331 self.provider
332 .best_block_number()
333 .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?
334 };
335
336 match self.provider.block_by_id(block_number.into()) {
337 Ok(Some(block)) => {
338 let block_msg = BlockMsg {
339 id: self.credentials.node_id.clone(),
340 block: self.block_to_stats(&block)?,
341 };
342
343 debug!(target: "ethstats", "Reporting block: {}", block_number);
344
345 let message = block_msg.generate_block_message();
346 conn.write_json(&message).await?;
347 }
348 Ok(None) => {
349 debug!(target: "ethstats", "Block {} not found", block_number);
351 return Err(EthStatsError::BlockNotFound(block_number));
352 }
353 Err(e) => {
354 debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
355 return Err(EthStatsError::DataFetchError(e.to_string()));
356 }
357 };
358
359 Ok(())
360 }
361
362 pub async fn report_new_payload(
367 &self,
368 block_hash: alloy_primitives::B256,
369 block_number: u64,
370 processing_time: Duration,
371 ) -> Result<(), EthStatsError> {
372 let conn = self.conn.read().await;
373 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
374
375 let payload_stats = PayloadStats {
376 number: U256::from(block_number),
377 hash: block_hash,
378 processing_time: processing_time.as_millis() as u64,
379 };
380
381 let payload_msg =
382 PayloadMsg { id: self.credentials.node_id.clone(), payload: payload_stats };
383
384 debug!(
385 target: "ethstats",
386 "Reporting new payload: block={}, hash={:?}, processing_time={}ms",
387 block_number,
388 block_hash,
389 processing_time.as_millis()
390 );
391
392 let message = payload_msg.generate_new_payload_message();
393 conn.write_json(&message).await?;
394
395 Ok(())
396 }
397
398 fn block_to_stats(
406 &self,
407 block: &<Provider as BlockReader>::Block,
408 ) -> Result<BlockStats, EthStatsError> {
409 let body = block.body();
410 let header = block.header();
411
412 let txs = body.transaction_hashes_iter().copied().map(|hash| TxStats { hash }).collect();
413
414 Ok(BlockStats {
415 number: U256::from(header.number()),
416 hash: header.hash_slow(),
417 parent_hash: header.parent_hash(),
418 timestamp: U256::from(header.timestamp()),
419 miner: header.beneficiary(),
420 gas_used: header.gas_used(),
421 gas_limit: header.gas_limit(),
422 diff: header.difficulty().to_string(),
423 total_diff: "0".into(),
424 txs,
425 tx_root: header.transactions_root(),
426 root: header.state_root(),
427 uncles: UncleStats(vec![]),
428 })
429 }
430
431 async fn report_history(&self, list: Option<&Vec<u64>>) -> Result<(), EthStatsError> {
440 let conn = self.conn.read().await;
441 let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
442
443 let indexes = if let Some(list) = list {
444 list
445 } else {
446 let best_block_number = self
447 .provider
448 .best_block_number()
449 .map_err(|e| EthStatsError::DataFetchError(e.to_string()))?;
450
451 let start = best_block_number.saturating_sub(HISTORY_UPDATE_RANGE);
452
453 &(start..=best_block_number).collect()
454 };
455
456 let mut blocks = Vec::with_capacity(indexes.len());
457 for &block_number in indexes {
458 match self.provider.block_by_id(block_number.into()) {
459 Ok(Some(block)) => {
460 blocks.push(block);
461 }
462 Ok(None) => {
463 debug!(target: "ethstats", "Block {} not found", block_number);
465 break;
466 }
467 Err(e) => {
468 debug!(target: "ethstats", "Error fetching block {}: {}", block_number, e);
469 break;
470 }
471 }
472 }
473
474 let history: Vec<BlockStats> =
475 blocks.iter().map(|block| self.block_to_stats(block)).collect::<Result<_, _>>()?;
476
477 if history.is_empty() {
478 debug!(target: "ethstats", "No history to send to stats server");
479 } else {
480 debug!(
481 target: "ethstats",
482 "Sending historical blocks to ethstats, first: {}, last: {}",
483 history.first().unwrap().number,
484 history.last().unwrap().number
485 );
486 }
487
488 let history_msg = HistoryMsg { id: self.credentials.node_id.clone(), history };
489
490 let message = history_msg.generate_history_message();
491 conn.write_json(&message).await?;
492
493 Ok(())
494 }
495
496 async fn report(&self) -> Result<(), EthStatsError> {
501 self.send_ping().await?;
502 self.report_block(None).await?;
503 self.report_pending().await?;
504 self.report_stats().await?;
505
506 Ok(())
507 }
508
509 async fn handle_message(&self, msg: Value) -> Result<(), EthStatsError> {
535 let emit = match msg.get("emit") {
536 Some(emit) => emit,
537 None => {
538 debug!(target: "ethstats", "Stats server sent non-broadcast, msg {}", msg);
539 return Err(EthStatsError::InvalidRequest);
540 }
541 };
542
543 let command = match emit.get(0) {
544 Some(Value::String(command)) => command.as_str(),
545 _ => {
546 debug!(target: "ethstats", "Invalid stats server message type, msg {}", msg);
547 return Err(EthStatsError::InvalidRequest);
548 }
549 };
550
551 match command {
552 "node-pong" => {
553 self.report_latency().await?;
554 }
555 "history" => {
556 let block_numbers = emit
557 .get(1)
558 .and_then(|v| v.as_object())
559 .and_then(|obj| obj.get("list"))
560 .and_then(|v| v.as_array());
561
562 if block_numbers.is_none() {
563 self.report_history(None).await?;
564
565 return Ok(());
566 }
567
568 let block_numbers = block_numbers
569 .unwrap()
570 .iter()
571 .map(|val| {
572 val.as_u64().ok_or_else(|| {
573 debug!(
574 target: "ethstats",
575 "Invalid stats history block number, msg {}", msg
576 );
577 EthStatsError::InvalidRequest
578 })
579 })
580 .collect::<Result<_, _>>()?;
581
582 self.report_history(Some(&block_numbers)).await?;
583 }
584 other => debug!(target: "ethstats", "Unhandled command: {}", other),
585 }
586
587 Ok(())
588 }
589
590 pub async fn run(self) {
602 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
604 let (message_tx, mut message_rx) = mpsc::channel(32);
605 let (head_tx, mut head_rx) = mpsc::channel(10);
606
607 let read_handle = {
609 let conn_arc = self.conn.clone();
610 let message_tx = message_tx.clone();
611 let shutdown_tx = shutdown_tx.clone();
612
613 tokio::spawn(async move {
614 loop {
615 let conn_guard = conn_arc.read().await;
616 if let Some(conn) = conn_guard.as_ref() {
617 match conn.read_json().await {
618 Ok(msg) => {
619 if message_tx.send(msg).await.is_err() {
620 break;
621 }
622 }
623 Err(e) => match e {
624 crate::error::ConnectionError::Serialization(err) => {
625 debug!(target: "ethstats", "JSON parse error from stats server: {}", err);
626 }
627 other => {
628 debug!(target: "ethstats", "Read error: {}", other);
629 drop(conn_guard);
630 if let Some(conn) = conn_arc.write().await.take() {
631 let _ = conn.close().await;
632 }
633 }
634 },
635 }
636 } else {
637 sleep(RECONNECT_INTERVAL).await;
638 }
639 }
640
641 let _ = shutdown_tx.send(()).await;
642 })
643 };
644
645 let canonical_stream_handle = {
646 let mut canonical_stream = self.provider.canonical_state_stream();
647 let head_tx = head_tx.clone();
648 let shutdown_tx = shutdown_tx.clone();
649
650 tokio::spawn(async move {
651 loop {
652 let head = canonical_stream.next().await;
653 if let Some(head) = head &&
654 head_tx.send(head).await.is_err()
655 {
656 break;
657 }
658 }
659
660 let _ = shutdown_tx.send(()).await;
661 })
662 };
663
664 let mut pending_tx_receiver = self.pool.pending_transactions_listener();
665
666 let mut report_interval = interval(REPORT_INTERVAL);
668 let mut reconnect_interval = interval(RECONNECT_INTERVAL);
669
670 loop {
672 tokio::select! {
673 _ = shutdown_rx.recv() => {
675 info!(target: "ethstats", "Shutting down ethstats service");
676 break;
677 }
678
679 Some(msg) = message_rx.recv() => {
681 if let Err(e) = self.handle_message(msg).await {
682 debug!(target: "ethstats", "Error handling message: {}", e);
683 self.disconnect().await;
684 }
685 }
686
687 Some(head) = head_rx.recv() => {
689 if let Err(e) = self.report_block(Some(head)).await {
690 debug!(target: "ethstats", "Failed to report block: {}", e);
691 self.disconnect().await;
692 }
693
694 if let Err(e) = self.report_pending().await {
695 debug!(target: "ethstats", "Failed to report pending: {}", e);
696 self.disconnect().await;
697 }
698 }
699
700 _= pending_tx_receiver.recv() => {
702 if let Err(e) = self.report_pending().await {
703 debug!(target: "ethstats", "Failed to report pending: {}", e);
704 self.disconnect().await;
705 }
706 }
707
708 _ = report_interval.tick() => {
710 if let Err(e) = self.report().await {
711 debug!(target: "ethstats", "Failed to report: {}", e);
712 self.disconnect().await;
713 }
714 }
715
716 _ = reconnect_interval.tick() => {
718 if self.conn.read().await.is_none() {
719 match self.connect().await {
720 Ok(_) => info!(target: "ethstats", "Reconnected successfully"),
721 Err(e) => debug!(target: "ethstats", "Reconnect failed: {}", e),
722 }
723 }
724 }
725 }
726 }
727
728 self.disconnect().await;
730
731 read_handle.abort();
733 canonical_stream_handle.abort();
734 }
735
736 async fn disconnect(&self) {
741 if let Some(conn) = self.conn.write().await.take() &&
742 let Err(e) = conn.close().await
743 {
744 debug!(target: "ethstats", "Error closing connection: {}", e);
745 }
746 }
747
748 #[cfg(test)]
750 pub async fn is_connected(&self) -> bool {
751 self.conn.read().await.is_some()
752 }
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use futures_util::{SinkExt, StreamExt};
759 use reth_network_api::noop::NoopNetwork;
760 use reth_storage_api::noop::NoopProvider;
761 use reth_transaction_pool::noop::NoopTransactionPool;
762 use serde_json::json;
763 use tokio::{net::TcpListener, sync::Notify};
764 use tokio_tungstenite::tungstenite::protocol::{frame::Utf8Bytes, Message};
765
766 const TEST_HOST: &str = "127.0.0.1";
767 const TEST_PORT: u16 = 0; async fn setup_mock_server() -> (String, tokio::task::JoinHandle<()>) {
770 let listener = TcpListener::bind((TEST_HOST, TEST_PORT)).await.unwrap();
771 let addr = listener.local_addr().unwrap();
772
773 let handle = tokio::spawn(async move {
774 let (stream, _) = listener.accept().await.unwrap();
775 let mut ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
776
777 if let Some(Ok(Message::Text(text))) = ws_stream.next().await {
779 let value: serde_json::Value = serde_json::from_str(&text).unwrap();
780 if value["emit"][0] == "hello" {
781 let response = json!({
782 "emit": ["ready", []]
783 });
784 ws_stream
785 .send(Message::Text(Utf8Bytes::from(response.to_string())))
786 .await
787 .unwrap();
788 }
789 }
790
791 while let Some(Ok(msg)) = ws_stream.next().await {
793 if let Message::Text(text) = msg &&
794 text.contains("node-ping")
795 {
796 let pong = json!({
797 "emit": ["node-pong", {"id": "test-node"}]
798 });
799 ws_stream.send(Message::Text(Utf8Bytes::from(pong.to_string()))).await.unwrap();
800 }
801 }
802 });
803
804 (addr.to_string(), handle)
805 }
806
807 #[tokio::test]
808 async fn test_connection_and_login() {
809 let (server_url, server_handle) = setup_mock_server().await;
810 let ethstats_url = format!("test-node:test-secret@{server_url}");
811
812 let network = NoopNetwork::default();
813 let provider = NoopProvider::default();
814 let pool = NoopTransactionPool::default();
815
816 let service = EthStatsService::new(ðstats_url, network, provider, pool)
817 .await
818 .expect("Service should connect");
819
820 assert!(service.is_connected().await, "Service should be connected");
822
823 server_handle.abort();
825 }
826
827 #[tokio::test]
828 async fn test_history_command_handling() {
829 let (server_url, server_handle) = setup_mock_server().await;
830 let ethstats_url = format!("test-node:test-secret@{server_url}");
831
832 let network = NoopNetwork::default();
833 let provider = NoopProvider::default();
834 let pool = NoopTransactionPool::default();
835
836 let service = EthStatsService::new(ðstats_url, network, provider, pool)
837 .await
838 .expect("Service should connect");
839
840 let history_cmd = json!({
842 "emit": ["history", {"list": [1, 2, 3]}]
843 });
844
845 service.handle_message(history_cmd).await.expect("History command should be handled");
846
847 server_handle.abort();
849 }
850
851 #[tokio::test]
852 async fn test_invalid_url_handling() {
853 let network = NoopNetwork::default();
854 let provider = NoopProvider::default();
855 let pool = NoopTransactionPool::default();
856
857 let result = EthStatsService::new(
859 "test-node@localhost",
860 network.clone(),
861 provider.clone(),
862 pool.clone(),
863 )
864 .await;
865 assert!(
866 matches!(result, Err(EthStatsError::InvalidUrl(_))),
867 "Should detect invalid URL format"
868 );
869
870 let result = EthStatsService::new("invalid-url", network, provider, pool).await;
872 assert!(
873 matches!(result, Err(EthStatsError::InvalidUrl(_))),
874 "Should detect invalid URL format"
875 );
876 }
877
878 #[tokio::test(flavor = "current_thread")]
879 async fn report_latency_lock_order_regression() {
880 let (server_url, server_handle) = setup_mock_server().await;
882 let ethstats_url = format!("test-node:test-secret@{server_url}");
883
884 let network = NoopNetwork::default();
885 let provider = NoopProvider::default();
886 let pool = NoopTransactionPool::default();
887
888 let service = EthStatsService::new(ðstats_url, network, provider, pool)
889 .await
890 .expect("Service should connect");
891
892 let mut last_ping_guard = service.last_ping.lock().await;
894 *last_ping_guard = Some(Instant::now());
895
896 let started = Arc::new(Notify::new());
897 let started_clone = started.clone();
898 let service_clone = service.clone();
899 let handle = tokio::spawn(async move {
900 started_clone.notify_one();
901 let _ = service_clone.report_latency().await;
902 });
903
904 started.notified().await;
906 tokio::task::yield_now().await;
907
908 let write_guard =
911 tokio::time::timeout(std::time::Duration::from_millis(100), service.conn.write())
912 .await
913 .expect(
914 "conn write lock should not be held while report_latency waits on last_ping",
915 );
916
917 drop(write_guard);
918 drop(last_ping_guard);
919
920 let _ = handle.await;
921 server_handle.abort();
922 }
923}