1use crate::{
2 chain::ChainSpecInfo,
3 hooks::{Hook, Hooks},
4 recorder::install_prometheus_recorder,
5 version::VersionInfo,
6};
7use eyre::WrapErr;
8use http::{header::CONTENT_TYPE, HeaderValue, Response};
9use metrics::describe_gauge;
10use metrics_process::Collector;
11use reqwest::Client;
12use reth_metrics::metrics::Unit;
13use reth_tasks::TaskExecutor;
14use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
15
16#[derive(Debug)]
18pub struct MetricServerConfig {
19 listen_addr: SocketAddr,
20 version_info: VersionInfo,
21 chain_spec_info: ChainSpecInfo,
22 task_executor: TaskExecutor,
23 hooks: Hooks,
24 push_gateway_url: Option<String>,
25 push_gateway_interval: Duration,
26}
27
28impl MetricServerConfig {
29 pub const fn new(
31 listen_addr: SocketAddr,
32 version_info: VersionInfo,
33 chain_spec_info: ChainSpecInfo,
34 task_executor: TaskExecutor,
35 hooks: Hooks,
36 ) -> Self {
37 Self {
38 listen_addr,
39 hooks,
40 task_executor,
41 version_info,
42 chain_spec_info,
43 push_gateway_url: None,
44 push_gateway_interval: Duration::from_secs(5),
45 }
46 }
47
48 pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
50 self.push_gateway_url = url;
51 self.push_gateway_interval = interval;
52 self
53 }
54}
55
56#[derive(Debug)]
58pub struct MetricServer {
59 config: MetricServerConfig,
60}
61
62impl MetricServer {
63 pub const fn new(config: MetricServerConfig) -> Self {
65 Self { config }
66 }
67
68 pub async fn serve(&self) -> eyre::Result<()> {
70 let MetricServerConfig {
71 listen_addr,
72 hooks,
73 task_executor,
74 version_info,
75 chain_spec_info,
76 push_gateway_url,
77 push_gateway_interval,
78 } = &self.config;
79
80 let hooks_for_endpoint = hooks.clone();
81 self.start_endpoint(
82 *listen_addr,
83 Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
84 task_executor.clone(),
85 )
86 .await
87 .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
88
89 if let Some(url) = push_gateway_url {
91 self.start_push_gateway_task(
92 url.clone(),
93 *push_gateway_interval,
94 hooks.clone(),
95 task_executor.clone(),
96 )?;
97 }
98
99 describe_db_metrics();
101 describe_static_file_metrics();
102 Collector::default().describe();
103 describe_memory_stats();
104 describe_io_stats();
105
106 version_info.register_version_metrics();
107 chain_spec_info.register_chain_spec_metrics();
108
109 Ok(())
110 }
111
112 async fn start_endpoint<F: Hook + 'static>(
113 &self,
114 listen_addr: SocketAddr,
115 hook: Arc<F>,
116 task_executor: TaskExecutor,
117 ) -> eyre::Result<()> {
118 let listener = tokio::net::TcpListener::bind(listen_addr)
119 .await
120 .wrap_err("Could not bind to address")?;
121
122 tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
123
124 task_executor.spawn_with_graceful_shutdown_signal(|mut signal| {
125 Box::pin(async move {
126 loop {
127 let io = tokio::select! {
128 _ = &mut signal => break,
129 io = listener.accept() => {
130 match io {
131 Ok((stream, _remote_addr)) => stream,
132 Err(err) => {
133 tracing::error!(%err, "failed to accept connection");
134 continue;
135 }
136 }
137 }
138 };
139
140 let handle = install_prometheus_recorder();
141 let hook = hook.clone();
142 let service = tower::service_fn(move |_| {
143 (hook)();
144 let metrics = handle.handle().render();
145 let mut response = Response::new(metrics);
146 response
147 .headers_mut()
148 .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
149 async move { Ok::<_, Infallible>(response) }
150 });
151
152 let mut shutdown = signal.clone().ignore_guard();
153 tokio::task::spawn(async move {
154 let _ = jsonrpsee_server::serve_with_graceful_shutdown(
155 io,
156 service,
157 &mut shutdown,
158 )
159 .await
160 .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
161 });
162 }
163 })
164 });
165
166 Ok(())
167 }
168
169 fn start_push_gateway_task(
171 &self,
172 url: String,
173 interval: Duration,
174 hooks: Hooks,
175 task_executor: TaskExecutor,
176 ) -> eyre::Result<()> {
177 let client = Client::builder()
178 .build()
179 .wrap_err("Could not create HTTP client to push metrics to gateway")?;
180 task_executor.spawn_with_graceful_shutdown_signal(move |mut signal| {
181 Box::pin(async move {
182 tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
183 let handle = install_prometheus_recorder();
184 loop {
185 tokio::select! {
186 _ = &mut signal => {
187 tracing::info!("Shutting down task to push metrics to gateway");
188 break;
189 }
190 _ = tokio::time::sleep(interval) => {
191 hooks.iter().for_each(|hook| hook());
192 let metrics = handle.handle().render();
193 match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
194 Ok(response) => {
195 if !response.status().is_success() {
196 tracing::warn!(
197 status = %response.status(),
198 "Failed to push metrics to gateway"
199 );
200 }
201 }
202 Err(err) => {
203 tracing::warn!(%err, "Failed to push metrics to gateway");
204 }
205 }
206 }
207 }
208 }
209 })
210 });
211 Ok(())
212 }
213}
214
215fn describe_db_metrics() {
216 describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
217 describe_gauge!("db.table_pages", "The number of database pages for a table");
218 describe_gauge!("db.table_entries", "The number of entries for a table");
219 describe_gauge!("db.freelist", "The number of pages on the freelist");
220 describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
221 describe_gauge!(
222 "db.timed_out_not_aborted_transactions",
223 "Number of timed out transactions that were not aborted by the user yet"
224 );
225}
226
227fn describe_static_file_metrics() {
228 describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
229 describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
230 describe_gauge!(
231 "static_files.segment_entries",
232 "The number of entries for a static file segment"
233 );
234}
235
236#[cfg(all(feature = "jemalloc", unix))]
237fn describe_memory_stats() {
238 describe_gauge!(
239 "jemalloc.active",
240 Unit::Bytes,
241 "Total number of bytes in active pages allocated by the application"
242 );
243 describe_gauge!(
244 "jemalloc.allocated",
245 Unit::Bytes,
246 "Total number of bytes allocated by the application"
247 );
248 describe_gauge!(
249 "jemalloc.mapped",
250 Unit::Bytes,
251 "Total number of bytes in active extents mapped by the allocator"
252 );
253 describe_gauge!(
254 "jemalloc.metadata",
255 Unit::Bytes,
256 "Total number of bytes dedicated to jemalloc metadata"
257 );
258 describe_gauge!(
259 "jemalloc.resident",
260 Unit::Bytes,
261 "Total number of bytes in physically resident data pages mapped by the allocator"
262 );
263 describe_gauge!(
264 "jemalloc.retained",
265 Unit::Bytes,
266 "Total number of bytes in virtual memory mappings that were retained rather than \
267 being returned to the operating system via e.g. munmap(2)"
268 );
269}
270
271#[cfg(not(all(feature = "jemalloc", unix)))]
272const fn describe_memory_stats() {}
273
274#[cfg(target_os = "linux")]
275fn describe_io_stats() {
276 use metrics::describe_counter;
277
278 describe_counter!("io.rchar", "Characters read");
279 describe_counter!("io.wchar", "Characters written");
280 describe_counter!("io.syscr", "Read syscalls");
281 describe_counter!("io.syscw", "Write syscalls");
282 describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
283 describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
284 describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
285}
286
287#[cfg(not(target_os = "linux"))]
288const fn describe_io_stats() {}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use reqwest::Client;
294 use reth_tasks::TaskManager;
295 use socket2::{Domain, Socket, Type};
296 use std::net::{SocketAddr, TcpListener};
297
298 fn get_random_available_addr() -> SocketAddr {
299 let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
300 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
301 socket.set_reuse_address(true).unwrap();
302 socket.bind(addr).unwrap();
303 socket.listen(1).unwrap();
304 let listener = TcpListener::from(socket);
305 listener.local_addr().unwrap()
306 }
307
308 #[tokio::test]
309 async fn test_metrics_endpoint() {
310 let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
311 let version_info = VersionInfo {
312 version: "test",
313 build_timestamp: "test",
314 cargo_features: "test",
315 git_sha: "test",
316 target_triple: "test",
317 build_profile: "test",
318 };
319
320 let tasks = TaskManager::current();
321 let executor = tasks.executor();
322
323 let hooks = Hooks::builder().build();
324
325 let listen_addr = get_random_available_addr();
326 let config =
327 MetricServerConfig::new(listen_addr, version_info, chain_spec_info, executor, hooks);
328
329 MetricServer::new(config).serve().await.unwrap();
330
331 let url = format!("http://{listen_addr}");
333 let response = Client::new().get(&url).send().await.unwrap();
334 assert!(response.status().is_success());
335
336 let body = response.text().await.unwrap();
338 assert!(body.contains("reth_process_cpu_seconds_total"));
339 assert!(body.contains("reth_process_start_time_seconds"));
340 }
341}