reth_node_metrics/
server.rs

1use crate::{
2    chain::ChainSpecInfo,
3    hooks::{Hook, Hooks},
4    recorder::install_prometheus_recorder,
5    version::VersionInfo,
6};
7use eyre::WrapErr;
8use http::{header::CONTENT_TYPE, HeaderValue, Response};
9use metrics::describe_gauge;
10use metrics_process::Collector;
11use reqwest::Client;
12use reth_metrics::metrics::Unit;
13use reth_tasks::TaskExecutor;
14use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
15
16/// Configuration for the [`MetricServer`]
17#[derive(Debug)]
18pub struct MetricServerConfig {
19    listen_addr: SocketAddr,
20    version_info: VersionInfo,
21    chain_spec_info: ChainSpecInfo,
22    task_executor: TaskExecutor,
23    hooks: Hooks,
24    push_gateway_url: Option<String>,
25    push_gateway_interval: Duration,
26}
27
28impl MetricServerConfig {
29    /// Create a new [`MetricServerConfig`] with the given configuration
30    pub const fn new(
31        listen_addr: SocketAddr,
32        version_info: VersionInfo,
33        chain_spec_info: ChainSpecInfo,
34        task_executor: TaskExecutor,
35        hooks: Hooks,
36    ) -> Self {
37        Self {
38            listen_addr,
39            hooks,
40            task_executor,
41            version_info,
42            chain_spec_info,
43            push_gateway_url: None,
44            push_gateway_interval: Duration::from_secs(5),
45        }
46    }
47
48    /// Set the gateway URL and interval for pushing metrics
49    pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
50        self.push_gateway_url = url;
51        self.push_gateway_interval = interval;
52        self
53    }
54}
55
56/// [`MetricServer`] responsible for serving the metrics endpoint
57#[derive(Debug)]
58pub struct MetricServer {
59    config: MetricServerConfig,
60}
61
62impl MetricServer {
63    /// Create a new [`MetricServer`] with the given configuration
64    pub const fn new(config: MetricServerConfig) -> Self {
65        Self { config }
66    }
67
68    /// Spawns the metrics server
69    pub async fn serve(&self) -> eyre::Result<()> {
70        let MetricServerConfig {
71            listen_addr,
72            hooks,
73            task_executor,
74            version_info,
75            chain_spec_info,
76            push_gateway_url,
77            push_gateway_interval,
78        } = &self.config;
79
80        let hooks_for_endpoint = hooks.clone();
81        self.start_endpoint(
82            *listen_addr,
83            Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
84            task_executor.clone(),
85        )
86        .await
87        .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
88
89        // Start push-gateway task if configured
90        if let Some(url) = push_gateway_url {
91            self.start_push_gateway_task(
92                url.clone(),
93                *push_gateway_interval,
94                hooks.clone(),
95                task_executor.clone(),
96            )?;
97        }
98
99        // Describe metrics after recorder installation
100        describe_db_metrics();
101        describe_static_file_metrics();
102        Collector::default().describe();
103        describe_memory_stats();
104        describe_io_stats();
105
106        version_info.register_version_metrics();
107        chain_spec_info.register_chain_spec_metrics();
108
109        Ok(())
110    }
111
112    async fn start_endpoint<F: Hook + 'static>(
113        &self,
114        listen_addr: SocketAddr,
115        hook: Arc<F>,
116        task_executor: TaskExecutor,
117    ) -> eyre::Result<()> {
118        let listener = tokio::net::TcpListener::bind(listen_addr)
119            .await
120            .wrap_err("Could not bind to address")?;
121
122        tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
123
124        task_executor.spawn_with_graceful_shutdown_signal(|mut signal| {
125            Box::pin(async move {
126                loop {
127                    let io = tokio::select! {
128                        _ = &mut signal => break,
129                        io = listener.accept() => {
130                            match io {
131                                Ok((stream, _remote_addr)) => stream,
132                                Err(err) => {
133                                    tracing::error!(%err, "failed to accept connection");
134                                    continue;
135                                }
136                            }
137                        }
138                    };
139
140                    let handle = install_prometheus_recorder();
141                    let hook = hook.clone();
142                    let service = tower::service_fn(move |_| {
143                        (hook)();
144                        let metrics = handle.handle().render();
145                        let mut response = Response::new(metrics);
146                        response
147                            .headers_mut()
148                            .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
149                        async move { Ok::<_, Infallible>(response) }
150                    });
151
152                    let mut shutdown = signal.clone().ignore_guard();
153                    tokio::task::spawn(async move {
154                        let _ = jsonrpsee_server::serve_with_graceful_shutdown(
155                            io,
156                            service,
157                            &mut shutdown,
158                        )
159                        .await
160                        .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
161                    });
162                }
163            })
164        });
165
166        Ok(())
167    }
168
169    /// Starts a background task to push metrics to a metrics gateway
170    fn start_push_gateway_task(
171        &self,
172        url: String,
173        interval: Duration,
174        hooks: Hooks,
175        task_executor: TaskExecutor,
176    ) -> eyre::Result<()> {
177        let client = Client::builder()
178            .build()
179            .wrap_err("Could not create HTTP client to push metrics to gateway")?;
180        task_executor.spawn_with_graceful_shutdown_signal(move |mut signal| {
181            Box::pin(async move {
182                tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
183                let handle = install_prometheus_recorder();
184                loop {
185                    tokio::select! {
186                        _ = &mut signal => {
187                            tracing::info!("Shutting down task to push metrics to gateway");
188                            break;
189                        }
190                        _ = tokio::time::sleep(interval) => {
191                            hooks.iter().for_each(|hook| hook());
192                            let metrics = handle.handle().render();
193                            match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
194                                Ok(response) => {
195                                    if !response.status().is_success() {
196                                        tracing::warn!(
197                                            status = %response.status(),
198                                            "Failed to push metrics to gateway"
199                                        );
200                                    }
201                                }
202                                Err(err) => {
203                                    tracing::warn!(%err, "Failed to push metrics to gateway");
204                                }
205                            }
206                        }
207                    }
208                }
209            })
210        });
211        Ok(())
212    }
213}
214
215fn describe_db_metrics() {
216    describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
217    describe_gauge!("db.table_pages", "The number of database pages for a table");
218    describe_gauge!("db.table_entries", "The number of entries for a table");
219    describe_gauge!("db.freelist", "The number of pages on the freelist");
220    describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
221    describe_gauge!(
222        "db.timed_out_not_aborted_transactions",
223        "Number of timed out transactions that were not aborted by the user yet"
224    );
225}
226
227fn describe_static_file_metrics() {
228    describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
229    describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
230    describe_gauge!(
231        "static_files.segment_entries",
232        "The number of entries for a static file segment"
233    );
234}
235
236#[cfg(all(feature = "jemalloc", unix))]
237fn describe_memory_stats() {
238    describe_gauge!(
239        "jemalloc.active",
240        Unit::Bytes,
241        "Total number of bytes in active pages allocated by the application"
242    );
243    describe_gauge!(
244        "jemalloc.allocated",
245        Unit::Bytes,
246        "Total number of bytes allocated by the application"
247    );
248    describe_gauge!(
249        "jemalloc.mapped",
250        Unit::Bytes,
251        "Total number of bytes in active extents mapped by the allocator"
252    );
253    describe_gauge!(
254        "jemalloc.metadata",
255        Unit::Bytes,
256        "Total number of bytes dedicated to jemalloc metadata"
257    );
258    describe_gauge!(
259        "jemalloc.resident",
260        Unit::Bytes,
261        "Total number of bytes in physically resident data pages mapped by the allocator"
262    );
263    describe_gauge!(
264        "jemalloc.retained",
265        Unit::Bytes,
266        "Total number of bytes in virtual memory mappings that were retained rather than \
267        being returned to the operating system via e.g. munmap(2)"
268    );
269}
270
271#[cfg(not(all(feature = "jemalloc", unix)))]
272const fn describe_memory_stats() {}
273
274#[cfg(target_os = "linux")]
275fn describe_io_stats() {
276    use metrics::describe_counter;
277
278    describe_counter!("io.rchar", "Characters read");
279    describe_counter!("io.wchar", "Characters written");
280    describe_counter!("io.syscr", "Read syscalls");
281    describe_counter!("io.syscw", "Write syscalls");
282    describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
283    describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
284    describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
285}
286
287#[cfg(not(target_os = "linux"))]
288const fn describe_io_stats() {}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use reqwest::Client;
294    use reth_tasks::TaskManager;
295    use socket2::{Domain, Socket, Type};
296    use std::net::{SocketAddr, TcpListener};
297
298    fn get_random_available_addr() -> SocketAddr {
299        let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
300        let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
301        socket.set_reuse_address(true).unwrap();
302        socket.bind(addr).unwrap();
303        socket.listen(1).unwrap();
304        let listener = TcpListener::from(socket);
305        listener.local_addr().unwrap()
306    }
307
308    #[tokio::test]
309    async fn test_metrics_endpoint() {
310        let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
311        let version_info = VersionInfo {
312            version: "test",
313            build_timestamp: "test",
314            cargo_features: "test",
315            git_sha: "test",
316            target_triple: "test",
317            build_profile: "test",
318        };
319
320        let tasks = TaskManager::current();
321        let executor = tasks.executor();
322
323        let hooks = Hooks::builder().build();
324
325        let listen_addr = get_random_available_addr();
326        let config =
327            MetricServerConfig::new(listen_addr, version_info, chain_spec_info, executor, hooks);
328
329        MetricServer::new(config).serve().await.unwrap();
330
331        // Send request to the metrics endpoint
332        let url = format!("http://{listen_addr}");
333        let response = Client::new().get(&url).send().await.unwrap();
334        assert!(response.status().is_success());
335
336        // Check the response body
337        let body = response.text().await.unwrap();
338        assert!(body.contains("reth_process_cpu_seconds_total"));
339        assert!(body.contains("reth_process_start_time_seconds"));
340    }
341}