1use crate::{
2 chain::ChainSpecInfo,
3 hooks::{Hook, Hooks},
4 process::register_process_metrics,
5 recorder::install_prometheus_recorder,
6 version::VersionInfo,
7};
8use bytes::Bytes;
9use eyre::WrapErr;
10use http::{header::CONTENT_TYPE, HeaderValue, Request, Response, StatusCode};
11use http_body_util::Full;
12use metrics::describe_gauge;
13use metrics_process::Collector;
14use reqwest::Client;
15use reth_metrics::metrics::Unit;
16use reth_tasks::TaskExecutor;
17use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
18
19#[derive(Debug)]
21pub struct MetricServerConfig {
22 listen_addr: SocketAddr,
23 version_info: VersionInfo,
24 chain_spec_info: ChainSpecInfo,
25 task_executor: TaskExecutor,
26 hooks: Hooks,
27 push_gateway_url: Option<String>,
28 push_gateway_interval: Duration,
29 pprof_dump_dir: PathBuf,
30}
31
32impl MetricServerConfig {
33 pub const fn new(
35 listen_addr: SocketAddr,
36 version_info: VersionInfo,
37 chain_spec_info: ChainSpecInfo,
38 task_executor: TaskExecutor,
39 hooks: Hooks,
40 pprof_dump_dir: PathBuf,
41 ) -> Self {
42 Self {
43 listen_addr,
44 hooks,
45 task_executor,
46 version_info,
47 chain_spec_info,
48 push_gateway_url: None,
49 push_gateway_interval: Duration::from_secs(5),
50 pprof_dump_dir,
51 }
52 }
53
54 pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
56 self.push_gateway_url = url;
57 self.push_gateway_interval = interval;
58 self
59 }
60}
61
62#[derive(Debug)]
64pub struct MetricServer {
65 config: MetricServerConfig,
66}
67
68impl MetricServer {
69 pub const fn new(config: MetricServerConfig) -> Self {
71 Self { config }
72 }
73
74 pub async fn serve(&self) -> eyre::Result<()> {
76 let MetricServerConfig {
77 listen_addr,
78 hooks,
79 task_executor,
80 version_info,
81 chain_spec_info,
82 push_gateway_url,
83 push_gateway_interval,
84 pprof_dump_dir,
85 } = &self.config;
86
87 let hooks_for_endpoint = hooks.clone();
88 self.start_endpoint(
89 *listen_addr,
90 Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
91 task_executor.clone(),
92 pprof_dump_dir.clone(),
93 )
94 .await
95 .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
96
97 if let Some(url) = push_gateway_url {
99 self.start_push_gateway_task(
100 url.clone(),
101 *push_gateway_interval,
102 hooks.clone(),
103 task_executor.clone(),
104 )?;
105 }
106
107 describe_db_metrics();
109 describe_static_file_metrics();
110 describe_rocksdb_metrics();
111 Collector::default().describe();
112 describe_memory_stats();
113 describe_io_stats();
114
115 version_info.register_version_metrics();
116 chain_spec_info.register_chain_spec_metrics();
117 register_process_metrics();
118
119 Ok(())
120 }
121
122 async fn start_endpoint<F: Hook + 'static>(
123 &self,
124 listen_addr: SocketAddr,
125 hook: Arc<F>,
126 task_executor: TaskExecutor,
127 pprof_dump_dir: PathBuf,
128 ) -> eyre::Result<()> {
129 let listener = tokio::net::TcpListener::bind(listen_addr)
130 .await
131 .wrap_err("Could not bind to address")?;
132
133 tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
134
135 task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
136 let io = tokio::select! {
137 _ = &mut signal => break,
138 io = listener.accept() => {
139 match io {
140 Ok((stream, _remote_addr)) => stream,
141 Err(err) => {
142 tracing::error!(%err, "failed to accept connection");
143 continue;
144 }
145 }
146 }
147 };
148
149 let handle = install_prometheus_recorder();
150 let hook = hook.clone();
151 let pprof_dump_dir = pprof_dump_dir.clone();
152 let service = tower::service_fn(move |req: Request<_>| {
153 let hook = hook.clone();
154 let pprof_dump_dir = pprof_dump_dir.clone();
155 async move {
156 let response =
157 handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir).await;
158 Ok::<_, Infallible>(response)
159 }
160 });
161
162 let mut shutdown = signal.clone().ignore_guard();
163 tokio::task::spawn(async move {
164 let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
165 .await
166 .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
167 });
168 });
169
170 Ok(())
171 }
172
173 fn start_push_gateway_task(
175 &self,
176 url: String,
177 interval: Duration,
178 hooks: Hooks,
179 task_executor: TaskExecutor,
180 ) -> eyre::Result<()> {
181 let client = Client::builder()
182 .build()
183 .wrap_err("Could not create HTTP client to push metrics to gateway")?;
184 task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
185 tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
186 let handle = install_prometheus_recorder();
187 loop {
188 tokio::select! {
189 _ = &mut signal => {
190 tracing::info!("Shutting down task to push metrics to gateway");
191 break;
192 }
193 _ = tokio::time::sleep(interval) => {
194 hooks.iter().for_each(|hook| hook());
195 let metrics = handle.handle().render();
196 match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
197 Ok(response) => {
198 if !response.status().is_success() {
199 tracing::warn!(
200 status = %response.status(),
201 "Failed to push metrics to gateway"
202 );
203 }
204 }
205 Err(err) => {
206 tracing::warn!(%err, "Failed to push metrics to gateway");
207 }
208 }
209 }
210 }
211 }
212 });
213 Ok(())
214 }
215}
216
217fn describe_db_metrics() {
218 describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
219 describe_gauge!("db.table_pages", "The number of database pages for a table");
220 describe_gauge!("db.table_entries", "The number of entries for a table");
221 describe_gauge!("db.freelist", "The number of pages on the freelist");
222 describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
223 describe_gauge!(
224 "db.timed_out_not_aborted_transactions",
225 "Number of timed out transactions that were not aborted by the user yet"
226 );
227}
228
229fn describe_static_file_metrics() {
230 describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
231 describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
232 describe_gauge!(
233 "static_files.segment_entries",
234 "The number of entries for a static file segment"
235 );
236}
237
238fn describe_rocksdb_metrics() {
239 describe_gauge!(
240 "rocksdb.table_size",
241 Unit::Bytes,
242 "The estimated size of a RocksDB table (SST + memtable)"
243 );
244 describe_gauge!("rocksdb.table_entries", "The estimated number of keys in a RocksDB table");
245 describe_gauge!(
246 "rocksdb.pending_compaction_bytes",
247 Unit::Bytes,
248 "Bytes pending compaction for a RocksDB table"
249 );
250 describe_gauge!("rocksdb.sst_size", Unit::Bytes, "The size of SST files for a RocksDB table");
251 describe_gauge!(
252 "rocksdb.memtable_size",
253 Unit::Bytes,
254 "The size of memtables for a RocksDB table"
255 );
256 describe_gauge!(
257 "rocksdb.wal_size",
258 Unit::Bytes,
259 "The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
260 );
261}
262
263#[cfg(all(feature = "jemalloc", unix))]
264fn describe_memory_stats() {
265 describe_gauge!(
266 "jemalloc.active",
267 Unit::Bytes,
268 "Total number of bytes in active pages allocated by the application"
269 );
270 describe_gauge!(
271 "jemalloc.allocated",
272 Unit::Bytes,
273 "Total number of bytes allocated by the application"
274 );
275 describe_gauge!(
276 "jemalloc.mapped",
277 Unit::Bytes,
278 "Total number of bytes in active extents mapped by the allocator"
279 );
280 describe_gauge!(
281 "jemalloc.metadata",
282 Unit::Bytes,
283 "Total number of bytes dedicated to jemalloc metadata"
284 );
285 describe_gauge!(
286 "jemalloc.resident",
287 Unit::Bytes,
288 "Total number of bytes in physically resident data pages mapped by the allocator"
289 );
290 describe_gauge!(
291 "jemalloc.retained",
292 Unit::Bytes,
293 "Total number of bytes in virtual memory mappings that were retained rather than \
294 being returned to the operating system via e.g. munmap(2)"
295 );
296}
297
298#[cfg(not(all(feature = "jemalloc", unix)))]
299const fn describe_memory_stats() {}
300
301#[cfg(target_os = "linux")]
302fn describe_io_stats() {
303 use metrics::describe_counter;
304
305 describe_counter!("io.rchar", "Characters read");
306 describe_counter!("io.wchar", "Characters written");
307 describe_counter!("io.syscr", "Read syscalls");
308 describe_counter!("io.syscw", "Write syscalls");
309 describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
310 describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
311 describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
312}
313
314#[cfg(not(target_os = "linux"))]
315const fn describe_io_stats() {}
316
317async fn handle_request(
318 path: &str,
319 hook: impl Fn(),
320 handle: &crate::recorder::PrometheusRecorder,
321 pprof_dump_dir: &PathBuf,
322) -> Response<Full<Bytes>> {
323 match path {
324 "/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
325 "/debug/tokio/dump" => handle_tokio_dump().await,
326 _ => {
327 hook();
328 let metrics = handle.handle().render();
329 let mut response = Response::new(Full::new(Bytes::from(metrics)));
330 response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
331 response
332 }
333 }
334}
335
336#[cfg(all(feature = "jemalloc-prof", unix))]
337fn handle_pprof_heap(pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
338 use http::header::CONTENT_ENCODING;
339
340 match jemalloc_pprof::PROF_CTL.as_ref() {
341 Some(prof_ctl) => match prof_ctl.try_lock() {
342 Ok(_) => match jemalloc_pprof_dump(pprof_dump_dir) {
343 Ok(pprof) => {
344 let mut response = Response::new(Full::new(Bytes::from(pprof)));
345 response
346 .headers_mut()
347 .insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
348 response
349 .headers_mut()
350 .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
351 response
352 }
353 Err(err) => {
354 let mut response = Response::new(Full::new(Bytes::from(format!(
355 "Failed to dump pprof: {err}"
356 ))));
357 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
358 response
359 }
360 },
361 Err(_) => {
362 let mut response = Response::new(Full::new(Bytes::from_static(
363 b"Profile dump already in progress. Try again later.",
364 )));
365 *response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
366 response
367 }
368 },
369 None => {
370 let mut response = Response::new(Full::new(Bytes::from_static(
371 b"jemalloc profiling not enabled. \
372 Set MALLOC_CONF=prof:true or rebuild with jemalloc-prof feature.",
373 )));
374 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
375 response
376 }
377 }
378}
379
380#[cfg(all(feature = "jemalloc-prof", unix))]
383fn jemalloc_pprof_dump(pprof_dump_dir: &PathBuf) -> eyre::Result<Vec<u8>> {
384 use std::{ffi::CString, io::BufReader};
385
386 use mappings::MAPPINGS;
387 use pprof_util::parse_jeheap;
388 use tempfile::NamedTempFile;
389
390 reth_fs_util::create_dir_all(pprof_dump_dir)?;
391 let f = NamedTempFile::new_in(pprof_dump_dir)?;
392 let path = CString::new(f.path().as_os_str().as_encoded_bytes()).unwrap();
393
394 unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", path.as_ptr()) }?;
397
398 let dump_reader = BufReader::new(f);
399 let profile =
400 parse_jeheap(dump_reader, MAPPINGS.as_deref()).map_err(|err| eyre::eyre!(Box::new(err)))?;
401 let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
402
403 Ok(pprof)
404}
405
406#[cfg(not(all(feature = "jemalloc-prof", unix)))]
407fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
408 let mut response = Response::new(Full::new(Bytes::from_static(
409 b"jemalloc pprof support not compiled. Rebuild with the jemalloc-prof feature.",
410 )));
411 *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
412 response
413}
414
415#[cfg(tokio_unstable)]
416async fn handle_tokio_dump() -> Response<Full<Bytes>> {
417 let handle = tokio::runtime::Handle::current();
418 let dump = handle.dump().await;
419
420 let mut output = String::new();
421 for (i, task) in dump.tasks().iter().enumerate() {
422 let trace = task.trace();
423 output.push_str(&format!("task {i}:\n{trace}\n\n"));
424 }
425
426 let mut response = Response::new(Full::new(Bytes::from(output)));
427 response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
428 response
429}
430
431#[cfg(not(tokio_unstable))]
432async fn handle_tokio_dump() -> Response<Full<Bytes>> {
433 let mut response = Response::new(Full::new(Bytes::from_static(
434 b"tokio task dump not available. Rebuild with RUSTFLAGS=\"--cfg tokio_unstable\" and tokio's `taskdump` feature.",
435 )));
436 *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
437 response
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use reqwest::Client;
444 use reth_tasks::Runtime;
445 use socket2::{Domain, Socket, Type};
446 use std::net::{SocketAddr, TcpListener};
447
448 fn get_random_available_addr() -> SocketAddr {
449 let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
450 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
451 socket.set_reuse_address(true).unwrap();
452 socket.bind(addr).unwrap();
453 socket.listen(1).unwrap();
454 let listener = TcpListener::from(socket);
455 listener.local_addr().unwrap()
456 }
457
458 #[tokio::test(flavor = "multi_thread")]
459 async fn test_metrics_endpoint() {
460 install_prometheus_recorder();
463
464 let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
465 let version_info = VersionInfo {
466 version: "test",
467 build_timestamp: "test",
468 cargo_features: "test",
469 git_sha: "test",
470 target_triple: "test",
471 build_profile: "test",
472 };
473
474 let runtime = Runtime::test();
475
476 let hooks = Hooks::builder().build();
477
478 let listen_addr = get_random_available_addr();
479 let config = MetricServerConfig::new(
480 listen_addr,
481 version_info,
482 chain_spec_info,
483 runtime.clone(),
484 hooks,
485 std::env::temp_dir(),
486 );
487
488 MetricServer::new(config).serve().await.unwrap();
489
490 let url = format!("http://{listen_addr}");
492 let response = Client::new().get(&url).send().await.unwrap();
493 assert!(response.status().is_success());
494
495 let body = response.text().await.unwrap();
497 assert!(body.contains("reth_process_cpu_seconds_total"));
498 assert!(body.contains("reth_process_start_time_seconds"));
499 assert!(body.contains("process_cli_args"), "expected process_cli_args metric in output");
500
501 drop(runtime);
503 }
504}