1use crate::{
2 chain::ChainSpecInfo,
3 hooks::{Hook, Hooks},
4 recorder::install_prometheus_recorder,
5 version::VersionInfo,
6};
7use eyre::WrapErr;
8use http::{header::CONTENT_TYPE, HeaderValue, Response};
9use metrics::describe_gauge;
10use metrics_process::Collector;
11use reth_metrics::metrics::Unit;
12use reth_tasks::TaskExecutor;
13use std::{convert::Infallible, net::SocketAddr, sync::Arc};
14
15#[derive(Debug)]
17pub struct MetricServerConfig {
18 listen_addr: SocketAddr,
19 version_info: VersionInfo,
20 chain_spec_info: ChainSpecInfo,
21 task_executor: TaskExecutor,
22 hooks: Hooks,
23}
24
25impl MetricServerConfig {
26 pub const fn new(
28 listen_addr: SocketAddr,
29 version_info: VersionInfo,
30 chain_spec_info: ChainSpecInfo,
31 task_executor: TaskExecutor,
32 hooks: Hooks,
33 ) -> Self {
34 Self { listen_addr, hooks, task_executor, version_info, chain_spec_info }
35 }
36}
37
38#[derive(Debug)]
40pub struct MetricServer {
41 config: MetricServerConfig,
42}
43
44impl MetricServer {
45 pub const fn new(config: MetricServerConfig) -> Self {
47 Self { config }
48 }
49
50 pub async fn serve(&self) -> eyre::Result<()> {
52 let MetricServerConfig { listen_addr, hooks, task_executor, version_info, chain_spec_info } =
53 &self.config;
54
55 let hooks = hooks.clone();
56 self.start_endpoint(
57 *listen_addr,
58 Arc::new(move || hooks.iter().for_each(|hook| hook())),
59 task_executor.clone(),
60 )
61 .await
62 .wrap_err("Could not start Prometheus endpoint")?;
63
64 describe_db_metrics();
66 describe_static_file_metrics();
67 Collector::default().describe();
68 describe_memory_stats();
69 describe_io_stats();
70
71 version_info.register_version_metrics();
72 chain_spec_info.register_chain_spec_metrics();
73
74 Ok(())
75 }
76
77 async fn start_endpoint<F: Hook + 'static>(
78 &self,
79 listen_addr: SocketAddr,
80 hook: Arc<F>,
81 task_executor: TaskExecutor,
82 ) -> eyre::Result<()> {
83 let listener = tokio::net::TcpListener::bind(listen_addr)
84 .await
85 .wrap_err("Could not bind to address")?;
86
87 task_executor.spawn_with_graceful_shutdown_signal(|mut signal| async move {
88 loop {
89 let io = tokio::select! {
90 _ = &mut signal => break,
91 io = listener.accept() => {
92 match io {
93 Ok((stream, _remote_addr)) => stream,
94 Err(err) => {
95 tracing::error!(%err, "failed to accept connection");
96 continue;
97 }
98 }
99 }
100 };
101
102 let handle = install_prometheus_recorder();
103 let hook = hook.clone();
104 let service = tower::service_fn(move |_| {
105 (hook)();
106 let metrics = handle.handle().render();
107 let mut response = Response::new(metrics);
108 response
109 .headers_mut()
110 .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
111 async move { Ok::<_, Infallible>(response) }
112 });
113
114 let mut shutdown = signal.clone().ignore_guard();
115 tokio::task::spawn(async move {
116 let _ =
117 jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
118 .await
119 .inspect_err(
120 |error| tracing::debug!(%error, "failed to serve request"),
121 );
122 });
123 }
124 });
125
126 Ok(())
127 }
128}
129
130fn describe_db_metrics() {
131 describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
132 describe_gauge!("db.table_pages", "The number of database pages for a table");
133 describe_gauge!("db.table_entries", "The number of entries for a table");
134 describe_gauge!("db.freelist", "The number of pages on the freelist");
135 describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
136 describe_gauge!(
137 "db.timed_out_not_aborted_transactions",
138 "Number of timed out transactions that were not aborted by the user yet"
139 );
140}
141
142fn describe_static_file_metrics() {
143 describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
144 describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
145 describe_gauge!(
146 "static_files.segment_entries",
147 "The number of entries for a static file segment"
148 );
149}
150
151#[cfg(all(feature = "jemalloc", unix))]
152fn describe_memory_stats() {
153 describe_gauge!(
154 "jemalloc.active",
155 Unit::Bytes,
156 "Total number of bytes in active pages allocated by the application"
157 );
158 describe_gauge!(
159 "jemalloc.allocated",
160 Unit::Bytes,
161 "Total number of bytes allocated by the application"
162 );
163 describe_gauge!(
164 "jemalloc.mapped",
165 Unit::Bytes,
166 "Total number of bytes in active extents mapped by the allocator"
167 );
168 describe_gauge!(
169 "jemalloc.metadata",
170 Unit::Bytes,
171 "Total number of bytes dedicated to jemalloc metadata"
172 );
173 describe_gauge!(
174 "jemalloc.resident",
175 Unit::Bytes,
176 "Total number of bytes in physically resident data pages mapped by the allocator"
177 );
178 describe_gauge!(
179 "jemalloc.retained",
180 Unit::Bytes,
181 "Total number of bytes in virtual memory mappings that were retained rather than \
182 being returned to the operating system via e.g. munmap(2)"
183 );
184}
185
186#[cfg(not(all(feature = "jemalloc", unix)))]
187const fn describe_memory_stats() {}
188
189#[cfg(target_os = "linux")]
190fn describe_io_stats() {
191 use metrics::describe_counter;
192
193 describe_counter!("io.rchar", "Characters read");
194 describe_counter!("io.wchar", "Characters written");
195 describe_counter!("io.syscr", "Read syscalls");
196 describe_counter!("io.syscw", "Write syscalls");
197 describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
198 describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
199 describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
200}
201
202#[cfg(not(target_os = "linux"))]
203const fn describe_io_stats() {}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use reqwest::Client;
209 use reth_tasks::TaskManager;
210 use socket2::{Domain, Socket, Type};
211 use std::net::{SocketAddr, TcpListener};
212
213 fn get_random_available_addr() -> SocketAddr {
214 let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
215 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
216 socket.set_reuse_address(true).unwrap();
217 socket.bind(addr).unwrap();
218 socket.listen(1).unwrap();
219 let listener = TcpListener::from(socket);
220 listener.local_addr().unwrap()
221 }
222
223 #[tokio::test]
224 async fn test_metrics_endpoint() {
225 let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
226 let version_info = VersionInfo {
227 version: "test",
228 build_timestamp: "test",
229 cargo_features: "test",
230 git_sha: "test",
231 target_triple: "test",
232 build_profile: "test",
233 };
234
235 let tasks = TaskManager::current();
236 let executor = tasks.executor();
237
238 let hooks = Hooks::builder().build();
239
240 let listen_addr = get_random_available_addr();
241 let config =
242 MetricServerConfig::new(listen_addr, version_info, chain_spec_info, executor, hooks);
243
244 MetricServer::new(config).serve().await.unwrap();
245
246 let url = format!("http://{listen_addr}");
248 let response = Client::new().get(&url).send().await.unwrap();
249 assert!(response.status().is_success());
250
251 let body = response.text().await.unwrap();
253 assert!(body.contains("reth_process_cpu_seconds_total"));
254 assert!(body.contains("reth_process_start_time_seconds"));
255 }
256}