reth_node_metrics/
server.rs

1use crate::{
2    chain::ChainSpecInfo,
3    hooks::{Hook, Hooks},
4    recorder::install_prometheus_recorder,
5    version::VersionInfo,
6};
7use eyre::WrapErr;
8use http::{header::CONTENT_TYPE, HeaderValue, Response};
9use metrics::describe_gauge;
10use metrics_process::Collector;
11use reth_metrics::metrics::Unit;
12use reth_tasks::TaskExecutor;
13use std::{convert::Infallible, net::SocketAddr, sync::Arc};
14
15/// Configuration for the [`MetricServer`]
16#[derive(Debug)]
17pub struct MetricServerConfig {
18    listen_addr: SocketAddr,
19    version_info: VersionInfo,
20    chain_spec_info: ChainSpecInfo,
21    task_executor: TaskExecutor,
22    hooks: Hooks,
23}
24
25impl MetricServerConfig {
26    /// Create a new [`MetricServerConfig`] with the given configuration
27    pub const fn new(
28        listen_addr: SocketAddr,
29        version_info: VersionInfo,
30        chain_spec_info: ChainSpecInfo,
31        task_executor: TaskExecutor,
32        hooks: Hooks,
33    ) -> Self {
34        Self { listen_addr, hooks, task_executor, version_info, chain_spec_info }
35    }
36}
37
38/// [`MetricServer`] responsible for serving the metrics endpoint
39#[derive(Debug)]
40pub struct MetricServer {
41    config: MetricServerConfig,
42}
43
44impl MetricServer {
45    /// Create a new [`MetricServer`] with the given configuration
46    pub const fn new(config: MetricServerConfig) -> Self {
47        Self { config }
48    }
49
50    /// Spawns the metrics server
51    pub async fn serve(&self) -> eyre::Result<()> {
52        let MetricServerConfig { listen_addr, hooks, task_executor, version_info, chain_spec_info } =
53            &self.config;
54
55        let hooks = hooks.clone();
56        self.start_endpoint(
57            *listen_addr,
58            Arc::new(move || hooks.iter().for_each(|hook| hook())),
59            task_executor.clone(),
60        )
61        .await
62        .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
63
64        // Describe metrics after recorder installation
65        describe_db_metrics();
66        describe_static_file_metrics();
67        Collector::default().describe();
68        describe_memory_stats();
69        describe_io_stats();
70
71        version_info.register_version_metrics();
72        chain_spec_info.register_chain_spec_metrics();
73
74        Ok(())
75    }
76
77    async fn start_endpoint<F: Hook + 'static>(
78        &self,
79        listen_addr: SocketAddr,
80        hook: Arc<F>,
81        task_executor: TaskExecutor,
82    ) -> eyre::Result<()> {
83        let listener = tokio::net::TcpListener::bind(listen_addr)
84            .await
85            .wrap_err("Could not bind to address")?;
86
87        task_executor.spawn_with_graceful_shutdown_signal(|mut signal| {
88            Box::pin(async move {
89                loop {
90                    let io = tokio::select! {
91                        _ = &mut signal => break,
92                        io = listener.accept() => {
93                            match io {
94                                Ok((stream, _remote_addr)) => stream,
95                                Err(err) => {
96                                    tracing::error!(%err, "failed to accept connection");
97                                    continue;
98                                }
99                            }
100                        }
101                    };
102
103                    let handle = install_prometheus_recorder();
104                    let hook = hook.clone();
105                    let service = tower::service_fn(move |_| {
106                        (hook)();
107                        let metrics = handle.handle().render();
108                        let mut response = Response::new(metrics);
109                        response
110                            .headers_mut()
111                            .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
112                        async move { Ok::<_, Infallible>(response) }
113                    });
114
115                    let mut shutdown = signal.clone().ignore_guard();
116                    tokio::task::spawn(async move {
117                        let _ = jsonrpsee_server::serve_with_graceful_shutdown(
118                            io,
119                            service,
120                            &mut shutdown,
121                        )
122                        .await
123                        .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
124                    });
125                }
126            })
127        });
128
129        Ok(())
130    }
131}
132
133fn describe_db_metrics() {
134    describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
135    describe_gauge!("db.table_pages", "The number of database pages for a table");
136    describe_gauge!("db.table_entries", "The number of entries for a table");
137    describe_gauge!("db.freelist", "The number of pages on the freelist");
138    describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
139    describe_gauge!(
140        "db.timed_out_not_aborted_transactions",
141        "Number of timed out transactions that were not aborted by the user yet"
142    );
143}
144
145fn describe_static_file_metrics() {
146    describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
147    describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
148    describe_gauge!(
149        "static_files.segment_entries",
150        "The number of entries for a static file segment"
151    );
152}
153
154#[cfg(all(feature = "jemalloc", unix))]
155fn describe_memory_stats() {
156    describe_gauge!(
157        "jemalloc.active",
158        Unit::Bytes,
159        "Total number of bytes in active pages allocated by the application"
160    );
161    describe_gauge!(
162        "jemalloc.allocated",
163        Unit::Bytes,
164        "Total number of bytes allocated by the application"
165    );
166    describe_gauge!(
167        "jemalloc.mapped",
168        Unit::Bytes,
169        "Total number of bytes in active extents mapped by the allocator"
170    );
171    describe_gauge!(
172        "jemalloc.metadata",
173        Unit::Bytes,
174        "Total number of bytes dedicated to jemalloc metadata"
175    );
176    describe_gauge!(
177        "jemalloc.resident",
178        Unit::Bytes,
179        "Total number of bytes in physically resident data pages mapped by the allocator"
180    );
181    describe_gauge!(
182        "jemalloc.retained",
183        Unit::Bytes,
184        "Total number of bytes in virtual memory mappings that were retained rather than \
185        being returned to the operating system via e.g. munmap(2)"
186    );
187}
188
189#[cfg(not(all(feature = "jemalloc", unix)))]
190const fn describe_memory_stats() {}
191
192#[cfg(target_os = "linux")]
193fn describe_io_stats() {
194    use metrics::describe_counter;
195
196    describe_counter!("io.rchar", "Characters read");
197    describe_counter!("io.wchar", "Characters written");
198    describe_counter!("io.syscr", "Read syscalls");
199    describe_counter!("io.syscw", "Write syscalls");
200    describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
201    describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
202    describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
203}
204
205#[cfg(not(target_os = "linux"))]
206const fn describe_io_stats() {}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use reqwest::Client;
212    use reth_tasks::TaskManager;
213    use socket2::{Domain, Socket, Type};
214    use std::net::{SocketAddr, TcpListener};
215
216    fn get_random_available_addr() -> SocketAddr {
217        let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
218        let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
219        socket.set_reuse_address(true).unwrap();
220        socket.bind(addr).unwrap();
221        socket.listen(1).unwrap();
222        let listener = TcpListener::from(socket);
223        listener.local_addr().unwrap()
224    }
225
226    #[tokio::test]
227    async fn test_metrics_endpoint() {
228        let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
229        let version_info = VersionInfo {
230            version: "test",
231            build_timestamp: "test",
232            cargo_features: "test",
233            git_sha: "test",
234            target_triple: "test",
235            build_profile: "test",
236        };
237
238        let tasks = TaskManager::current();
239        let executor = tasks.executor();
240
241        let hooks = Hooks::builder().build();
242
243        let listen_addr = get_random_available_addr();
244        let config =
245            MetricServerConfig::new(listen_addr, version_info, chain_spec_info, executor, hooks);
246
247        MetricServer::new(config).serve().await.unwrap();
248
249        // Send request to the metrics endpoint
250        let url = format!("http://{listen_addr}");
251        let response = Client::new().get(&url).send().await.unwrap();
252        assert!(response.status().is_success());
253
254        // Check the response body
255        let body = response.text().await.unwrap();
256        assert!(body.contains("reth_process_cpu_seconds_total"));
257        assert!(body.contains("reth_process_start_time_seconds"));
258    }
259}