1use crate::{
2 chain::ChainSpecInfo,
3 hooks::{Hook, Hooks},
4 recorder::install_prometheus_recorder,
5 version::VersionInfo,
6};
7use eyre::WrapErr;
8use http::{header::CONTENT_TYPE, HeaderValue, Response};
9use metrics::describe_gauge;
10use metrics_process::Collector;
11use reth_metrics::metrics::Unit;
12use reth_tasks::TaskExecutor;
13use std::{convert::Infallible, net::SocketAddr, sync::Arc};
14
15#[derive(Debug)]
17pub struct MetricServerConfig {
18 listen_addr: SocketAddr,
19 version_info: VersionInfo,
20 chain_spec_info: ChainSpecInfo,
21 task_executor: TaskExecutor,
22 hooks: Hooks,
23}
24
25impl MetricServerConfig {
26 pub const fn new(
28 listen_addr: SocketAddr,
29 version_info: VersionInfo,
30 chain_spec_info: ChainSpecInfo,
31 task_executor: TaskExecutor,
32 hooks: Hooks,
33 ) -> Self {
34 Self { listen_addr, hooks, task_executor, version_info, chain_spec_info }
35 }
36}
37
38#[derive(Debug)]
40pub struct MetricServer {
41 config: MetricServerConfig,
42}
43
44impl MetricServer {
45 pub const fn new(config: MetricServerConfig) -> Self {
47 Self { config }
48 }
49
50 pub async fn serve(&self) -> eyre::Result<()> {
52 let MetricServerConfig { listen_addr, hooks, task_executor, version_info, chain_spec_info } =
53 &self.config;
54
55 let hooks = hooks.clone();
56 self.start_endpoint(
57 *listen_addr,
58 Arc::new(move || hooks.iter().for_each(|hook| hook())),
59 task_executor.clone(),
60 )
61 .await
62 .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
63
64 describe_db_metrics();
66 describe_static_file_metrics();
67 Collector::default().describe();
68 describe_memory_stats();
69 describe_io_stats();
70
71 version_info.register_version_metrics();
72 chain_spec_info.register_chain_spec_metrics();
73
74 Ok(())
75 }
76
77 async fn start_endpoint<F: Hook + 'static>(
78 &self,
79 listen_addr: SocketAddr,
80 hook: Arc<F>,
81 task_executor: TaskExecutor,
82 ) -> eyre::Result<()> {
83 let listener = tokio::net::TcpListener::bind(listen_addr)
84 .await
85 .wrap_err("Could not bind to address")?;
86
87 task_executor.spawn_with_graceful_shutdown_signal(|mut signal| {
88 Box::pin(async move {
89 loop {
90 let io = tokio::select! {
91 _ = &mut signal => break,
92 io = listener.accept() => {
93 match io {
94 Ok((stream, _remote_addr)) => stream,
95 Err(err) => {
96 tracing::error!(%err, "failed to accept connection");
97 continue;
98 }
99 }
100 }
101 };
102
103 let handle = install_prometheus_recorder();
104 let hook = hook.clone();
105 let service = tower::service_fn(move |_| {
106 (hook)();
107 let metrics = handle.handle().render();
108 let mut response = Response::new(metrics);
109 response
110 .headers_mut()
111 .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
112 async move { Ok::<_, Infallible>(response) }
113 });
114
115 let mut shutdown = signal.clone().ignore_guard();
116 tokio::task::spawn(async move {
117 let _ = jsonrpsee_server::serve_with_graceful_shutdown(
118 io,
119 service,
120 &mut shutdown,
121 )
122 .await
123 .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
124 });
125 }
126 })
127 });
128
129 Ok(())
130 }
131}
132
133fn describe_db_metrics() {
134 describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
135 describe_gauge!("db.table_pages", "The number of database pages for a table");
136 describe_gauge!("db.table_entries", "The number of entries for a table");
137 describe_gauge!("db.freelist", "The number of pages on the freelist");
138 describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
139 describe_gauge!(
140 "db.timed_out_not_aborted_transactions",
141 "Number of timed out transactions that were not aborted by the user yet"
142 );
143}
144
145fn describe_static_file_metrics() {
146 describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
147 describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
148 describe_gauge!(
149 "static_files.segment_entries",
150 "The number of entries for a static file segment"
151 );
152}
153
154#[cfg(all(feature = "jemalloc", unix))]
155fn describe_memory_stats() {
156 describe_gauge!(
157 "jemalloc.active",
158 Unit::Bytes,
159 "Total number of bytes in active pages allocated by the application"
160 );
161 describe_gauge!(
162 "jemalloc.allocated",
163 Unit::Bytes,
164 "Total number of bytes allocated by the application"
165 );
166 describe_gauge!(
167 "jemalloc.mapped",
168 Unit::Bytes,
169 "Total number of bytes in active extents mapped by the allocator"
170 );
171 describe_gauge!(
172 "jemalloc.metadata",
173 Unit::Bytes,
174 "Total number of bytes dedicated to jemalloc metadata"
175 );
176 describe_gauge!(
177 "jemalloc.resident",
178 Unit::Bytes,
179 "Total number of bytes in physically resident data pages mapped by the allocator"
180 );
181 describe_gauge!(
182 "jemalloc.retained",
183 Unit::Bytes,
184 "Total number of bytes in virtual memory mappings that were retained rather than \
185 being returned to the operating system via e.g. munmap(2)"
186 );
187}
188
189#[cfg(not(all(feature = "jemalloc", unix)))]
190const fn describe_memory_stats() {}
191
192#[cfg(target_os = "linux")]
193fn describe_io_stats() {
194 use metrics::describe_counter;
195
196 describe_counter!("io.rchar", "Characters read");
197 describe_counter!("io.wchar", "Characters written");
198 describe_counter!("io.syscr", "Read syscalls");
199 describe_counter!("io.syscw", "Write syscalls");
200 describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
201 describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
202 describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
203}
204
205#[cfg(not(target_os = "linux"))]
206const fn describe_io_stats() {}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use reqwest::Client;
212 use reth_tasks::TaskManager;
213 use socket2::{Domain, Socket, Type};
214 use std::net::{SocketAddr, TcpListener};
215
216 fn get_random_available_addr() -> SocketAddr {
217 let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
218 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
219 socket.set_reuse_address(true).unwrap();
220 socket.bind(addr).unwrap();
221 socket.listen(1).unwrap();
222 let listener = TcpListener::from(socket);
223 listener.local_addr().unwrap()
224 }
225
226 #[tokio::test]
227 async fn test_metrics_endpoint() {
228 let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
229 let version_info = VersionInfo {
230 version: "test",
231 build_timestamp: "test",
232 cargo_features: "test",
233 git_sha: "test",
234 target_triple: "test",
235 build_profile: "test",
236 };
237
238 let tasks = TaskManager::current();
239 let executor = tasks.executor();
240
241 let hooks = Hooks::builder().build();
242
243 let listen_addr = get_random_available_addr();
244 let config =
245 MetricServerConfig::new(listen_addr, version_info, chain_spec_info, executor, hooks);
246
247 MetricServer::new(config).serve().await.unwrap();
248
249 let url = format!("http://{listen_addr}");
251 let response = Client::new().get(&url).send().await.unwrap();
252 assert!(response.status().is_success());
253
254 let body = response.text().await.unwrap();
256 assert!(body.contains("reth_process_cpu_seconds_total"));
257 assert!(body.contains("reth_process_start_time_seconds"));
258 }
259}