reth_node_metrics/
server.rs

1use crate::{
2    chain::ChainSpecInfo,
3    hooks::{Hook, Hooks},
4    recorder::install_prometheus_recorder,
5    version::VersionInfo,
6};
7use eyre::WrapErr;
8use http::{header::CONTENT_TYPE, HeaderValue, Response};
9use metrics::describe_gauge;
10use metrics_process::Collector;
11use reth_metrics::metrics::Unit;
12use reth_tasks::TaskExecutor;
13use std::{convert::Infallible, net::SocketAddr, sync::Arc};
14
15/// Configuration for the [`MetricServer`]
16#[derive(Debug)]
17pub struct MetricServerConfig {
18    listen_addr: SocketAddr,
19    version_info: VersionInfo,
20    chain_spec_info: ChainSpecInfo,
21    task_executor: TaskExecutor,
22    hooks: Hooks,
23}
24
25impl MetricServerConfig {
26    /// Create a new [`MetricServerConfig`] with the given configuration
27    pub const fn new(
28        listen_addr: SocketAddr,
29        version_info: VersionInfo,
30        chain_spec_info: ChainSpecInfo,
31        task_executor: TaskExecutor,
32        hooks: Hooks,
33    ) -> Self {
34        Self { listen_addr, hooks, task_executor, version_info, chain_spec_info }
35    }
36}
37
38/// [`MetricServer`] responsible for serving the metrics endpoint
39#[derive(Debug)]
40pub struct MetricServer {
41    config: MetricServerConfig,
42}
43
44impl MetricServer {
45    /// Create a new [`MetricServer`] with the given configuration
46    pub const fn new(config: MetricServerConfig) -> Self {
47        Self { config }
48    }
49
50    /// Spawns the metrics server
51    pub async fn serve(&self) -> eyre::Result<()> {
52        let MetricServerConfig { listen_addr, hooks, task_executor, version_info, chain_spec_info } =
53            &self.config;
54
55        let hooks = hooks.clone();
56        self.start_endpoint(
57            *listen_addr,
58            Arc::new(move || hooks.iter().for_each(|hook| hook())),
59            task_executor.clone(),
60        )
61        .await
62        .wrap_err("Could not start Prometheus endpoint")?;
63
64        // Describe metrics after recorder installation
65        describe_db_metrics();
66        describe_static_file_metrics();
67        Collector::default().describe();
68        describe_memory_stats();
69        describe_io_stats();
70
71        version_info.register_version_metrics();
72        chain_spec_info.register_chain_spec_metrics();
73
74        Ok(())
75    }
76
77    async fn start_endpoint<F: Hook + 'static>(
78        &self,
79        listen_addr: SocketAddr,
80        hook: Arc<F>,
81        task_executor: TaskExecutor,
82    ) -> eyre::Result<()> {
83        let listener = tokio::net::TcpListener::bind(listen_addr)
84            .await
85            .wrap_err("Could not bind to address")?;
86
87        task_executor.spawn_with_graceful_shutdown_signal(|mut signal| async move {
88            loop {
89                let io = tokio::select! {
90                    _ = &mut signal => break,
91                    io = listener.accept() => {
92                        match io {
93                            Ok((stream, _remote_addr)) => stream,
94                            Err(err) => {
95                                tracing::error!(%err, "failed to accept connection");
96                                continue;
97                            }
98                        }
99                    }
100                };
101
102                let handle = install_prometheus_recorder();
103                let hook = hook.clone();
104                let service = tower::service_fn(move |_| {
105                    (hook)();
106                    let metrics = handle.handle().render();
107                    let mut response = Response::new(metrics);
108                    response
109                        .headers_mut()
110                        .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
111                    async move { Ok::<_, Infallible>(response) }
112                });
113
114                let mut shutdown = signal.clone().ignore_guard();
115                tokio::task::spawn(async move {
116                    let _ =
117                        jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
118                            .await
119                            .inspect_err(
120                                |error| tracing::debug!(%error, "failed to serve request"),
121                            );
122                });
123            }
124        });
125
126        Ok(())
127    }
128}
129
130fn describe_db_metrics() {
131    describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
132    describe_gauge!("db.table_pages", "The number of database pages for a table");
133    describe_gauge!("db.table_entries", "The number of entries for a table");
134    describe_gauge!("db.freelist", "The number of pages on the freelist");
135    describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
136    describe_gauge!(
137        "db.timed_out_not_aborted_transactions",
138        "Number of timed out transactions that were not aborted by the user yet"
139    );
140}
141
142fn describe_static_file_metrics() {
143    describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
144    describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
145    describe_gauge!(
146        "static_files.segment_entries",
147        "The number of entries for a static file segment"
148    );
149}
150
151#[cfg(all(feature = "jemalloc", unix))]
152fn describe_memory_stats() {
153    describe_gauge!(
154        "jemalloc.active",
155        Unit::Bytes,
156        "Total number of bytes in active pages allocated by the application"
157    );
158    describe_gauge!(
159        "jemalloc.allocated",
160        Unit::Bytes,
161        "Total number of bytes allocated by the application"
162    );
163    describe_gauge!(
164        "jemalloc.mapped",
165        Unit::Bytes,
166        "Total number of bytes in active extents mapped by the allocator"
167    );
168    describe_gauge!(
169        "jemalloc.metadata",
170        Unit::Bytes,
171        "Total number of bytes dedicated to jemalloc metadata"
172    );
173    describe_gauge!(
174        "jemalloc.resident",
175        Unit::Bytes,
176        "Total number of bytes in physically resident data pages mapped by the allocator"
177    );
178    describe_gauge!(
179        "jemalloc.retained",
180        Unit::Bytes,
181        "Total number of bytes in virtual memory mappings that were retained rather than \
182        being returned to the operating system via e.g. munmap(2)"
183    );
184}
185
186#[cfg(not(all(feature = "jemalloc", unix)))]
187const fn describe_memory_stats() {}
188
189#[cfg(target_os = "linux")]
190fn describe_io_stats() {
191    use metrics::describe_counter;
192
193    describe_counter!("io.rchar", "Characters read");
194    describe_counter!("io.wchar", "Characters written");
195    describe_counter!("io.syscr", "Read syscalls");
196    describe_counter!("io.syscw", "Write syscalls");
197    describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
198    describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
199    describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
200}
201
202#[cfg(not(target_os = "linux"))]
203const fn describe_io_stats() {}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use reqwest::Client;
209    use reth_tasks::TaskManager;
210    use socket2::{Domain, Socket, Type};
211    use std::net::{SocketAddr, TcpListener};
212
213    fn get_random_available_addr() -> SocketAddr {
214        let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
215        let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
216        socket.set_reuse_address(true).unwrap();
217        socket.bind(addr).unwrap();
218        socket.listen(1).unwrap();
219        let listener = TcpListener::from(socket);
220        listener.local_addr().unwrap()
221    }
222
223    #[tokio::test]
224    async fn test_metrics_endpoint() {
225        let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
226        let version_info = VersionInfo {
227            version: "test",
228            build_timestamp: "test",
229            cargo_features: "test",
230            git_sha: "test",
231            target_triple: "test",
232            build_profile: "test",
233        };
234
235        let tasks = TaskManager::current();
236        let executor = tasks.executor();
237
238        let hooks = Hooks::builder().build();
239
240        let listen_addr = get_random_available_addr();
241        let config =
242            MetricServerConfig::new(listen_addr, version_info, chain_spec_info, executor, hooks);
243
244        MetricServer::new(config).serve().await.unwrap();
245
246        // Send request to the metrics endpoint
247        let url = format!("http://{listen_addr}");
248        let response = Client::new().get(&url).send().await.unwrap();
249        assert!(response.status().is_success());
250
251        // Check the response body
252        let body = response.text().await.unwrap();
253        assert!(body.contains("reth_process_cpu_seconds_total"));
254        assert!(body.contains("reth_process_start_time_seconds"));
255    }
256}