1use crate::{
2 chain::ChainSpecInfo,
3 hooks::{Hook, Hooks},
4 process::register_process_metrics,
5 recorder::install_prometheus_recorder,
6 storage::StorageSettingsInfo,
7 version::VersionInfo,
8};
9use bytes::Bytes;
10use eyre::WrapErr;
11use http::{header::CONTENT_TYPE, HeaderValue, Request, Response, StatusCode};
12use http_body_util::Full;
13use metrics::describe_gauge;
14use metrics_process::Collector;
15use reqwest::Client;
16use reth_metrics::metrics::Unit;
17use reth_tasks::TaskExecutor;
18use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
19
20#[derive(Debug)]
22pub struct MetricServerConfig {
23 listen_addr: SocketAddr,
24 version_info: VersionInfo,
25 chain_spec_info: ChainSpecInfo,
26 storage_settings_info: Option<StorageSettingsInfo>,
27 task_executor: TaskExecutor,
28 hooks: Hooks,
29 push_gateway_url: Option<String>,
30 push_gateway_interval: Duration,
31 pprof_dump_dir: PathBuf,
32}
33
34impl MetricServerConfig {
35 pub const fn new(
37 listen_addr: SocketAddr,
38 version_info: VersionInfo,
39 chain_spec_info: ChainSpecInfo,
40 task_executor: TaskExecutor,
41 hooks: Hooks,
42 pprof_dump_dir: PathBuf,
43 ) -> Self {
44 Self {
45 listen_addr,
46 hooks,
47 task_executor,
48 version_info,
49 chain_spec_info,
50 storage_settings_info: None,
51 push_gateway_url: None,
52 push_gateway_interval: Duration::from_secs(5),
53 pprof_dump_dir,
54 }
55 }
56
57 pub fn with_storage_settings_info(mut self, info: StorageSettingsInfo) -> Self {
59 self.storage_settings_info = Some(info);
60 self
61 }
62
63 pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
65 self.push_gateway_url = url;
66 self.push_gateway_interval = interval;
67 self
68 }
69}
70
71#[derive(Debug)]
73pub struct MetricServer {
74 config: MetricServerConfig,
75}
76
77impl MetricServer {
78 pub const fn new(config: MetricServerConfig) -> Self {
80 Self { config }
81 }
82
83 pub async fn serve(&self) -> eyre::Result<()> {
85 let MetricServerConfig {
86 listen_addr,
87 hooks,
88 task_executor,
89 version_info,
90 chain_spec_info,
91 storage_settings_info,
92 push_gateway_url,
93 push_gateway_interval,
94 pprof_dump_dir,
95 } = &self.config;
96
97 let hooks_for_endpoint = hooks.clone();
98 self.start_endpoint(
99 *listen_addr,
100 Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
101 task_executor.clone(),
102 pprof_dump_dir.clone(),
103 )
104 .await
105 .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
106
107 if let Some(url) = push_gateway_url {
109 self.start_push_gateway_task(
110 url.clone(),
111 *push_gateway_interval,
112 hooks.clone(),
113 task_executor.clone(),
114 )?;
115 }
116
117 describe_db_metrics();
119 describe_static_file_metrics();
120 describe_rocksdb_metrics();
121 Collector::default().describe();
122 describe_memory_stats();
123 describe_io_stats();
124
125 version_info.register_version_metrics();
126 chain_spec_info.register_chain_spec_metrics();
127 if let Some(storage_settings_info) = storage_settings_info {
128 storage_settings_info.register_storage_settings_metrics();
129 }
130 register_process_metrics();
131
132 Ok(())
133 }
134
135 async fn start_endpoint<F: Hook + 'static>(
136 &self,
137 listen_addr: SocketAddr,
138 hook: Arc<F>,
139 task_executor: TaskExecutor,
140 pprof_dump_dir: PathBuf,
141 ) -> eyre::Result<()> {
142 let listener = tokio::net::TcpListener::bind(listen_addr)
143 .await
144 .wrap_err("Could not bind to address")?;
145
146 tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
147
148 task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
149 let io = tokio::select! {
150 _ = &mut signal => break,
151 io = listener.accept() => {
152 match io {
153 Ok((stream, _remote_addr)) => stream,
154 Err(err) => {
155 tracing::error!(%err, "failed to accept connection");
156 continue;
157 }
158 }
159 }
160 };
161
162 let handle = install_prometheus_recorder();
163 let hook = hook.clone();
164 let pprof_dump_dir = pprof_dump_dir.clone();
165 let service = tower::service_fn(move |req: Request<_>| {
166 let hook = hook.clone();
167 let pprof_dump_dir = pprof_dump_dir.clone();
168 async move {
169 let response =
170 handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir).await;
171 Ok::<_, Infallible>(response)
172 }
173 });
174
175 let mut shutdown = signal.clone().ignore_guard();
176 tokio::task::spawn(async move {
177 let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
178 .await
179 .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
180 });
181 });
182
183 Ok(())
184 }
185
186 fn start_push_gateway_task(
188 &self,
189 url: String,
190 interval: Duration,
191 hooks: Hooks,
192 task_executor: TaskExecutor,
193 ) -> eyre::Result<()> {
194 let client = Client::builder()
195 .build()
196 .wrap_err("Could not create HTTP client to push metrics to gateway")?;
197 task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
198 tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
199 let handle = install_prometheus_recorder();
200 loop {
201 tokio::select! {
202 _ = &mut signal => {
203 tracing::info!("Shutting down task to push metrics to gateway");
204 break;
205 }
206 _ = tokio::time::sleep(interval) => {
207 hooks.iter().for_each(|hook| hook());
208 let metrics = handle.handle().render();
209 match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
210 Ok(response) => {
211 if !response.status().is_success() {
212 tracing::warn!(
213 status = %response.status(),
214 "Failed to push metrics to gateway"
215 );
216 }
217 }
218 Err(err) => {
219 tracing::warn!(%err, "Failed to push metrics to gateway");
220 }
221 }
222 }
223 }
224 }
225 });
226 Ok(())
227 }
228}
229
230fn describe_db_metrics() {
231 describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
232 describe_gauge!("db.table_pages", "The number of database pages for a table");
233 describe_gauge!("db.table_entries", "The number of entries for a table");
234 describe_gauge!("db.freelist", "The number of pages on the freelist");
235 describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
236 describe_gauge!(
237 "db.timed_out_not_aborted_transactions",
238 "Number of timed out transactions that were not aborted by the user yet"
239 );
240}
241
242fn describe_static_file_metrics() {
243 describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
244 describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
245 describe_gauge!(
246 "static_files.segment_entries",
247 "The number of entries for a static file segment"
248 );
249}
250
251fn describe_rocksdb_metrics() {
252 describe_gauge!(
253 "rocksdb.table_size",
254 Unit::Bytes,
255 "The estimated size of a RocksDB table (SST + memtable)"
256 );
257 describe_gauge!("rocksdb.table_entries", "The estimated number of keys in a RocksDB table");
258 describe_gauge!(
259 "rocksdb.pending_compaction_bytes",
260 Unit::Bytes,
261 "Bytes pending compaction for a RocksDB table"
262 );
263 describe_gauge!("rocksdb.sst_size", Unit::Bytes, "The size of SST files for a RocksDB table");
264 describe_gauge!(
265 "rocksdb.memtable_size",
266 Unit::Bytes,
267 "The size of memtables for a RocksDB table"
268 );
269 describe_gauge!(
270 "rocksdb.wal_size",
271 Unit::Bytes,
272 "The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
273 );
274}
275
276#[cfg(all(feature = "jemalloc", unix))]
277fn describe_memory_stats() {
278 describe_gauge!(
279 "jemalloc.active",
280 Unit::Bytes,
281 "Total number of bytes in active pages allocated by the application"
282 );
283 describe_gauge!(
284 "jemalloc.allocated",
285 Unit::Bytes,
286 "Total number of bytes allocated by the application"
287 );
288 describe_gauge!(
289 "jemalloc.mapped",
290 Unit::Bytes,
291 "Total number of bytes in active extents mapped by the allocator"
292 );
293 describe_gauge!(
294 "jemalloc.metadata",
295 Unit::Bytes,
296 "Total number of bytes dedicated to jemalloc metadata"
297 );
298 describe_gauge!(
299 "jemalloc.resident",
300 Unit::Bytes,
301 "Total number of bytes in physically resident data pages mapped by the allocator"
302 );
303 describe_gauge!(
304 "jemalloc.retained",
305 Unit::Bytes,
306 "Total number of bytes in virtual memory mappings that were retained rather than \
307 being returned to the operating system via e.g. munmap(2)"
308 );
309}
310
311#[cfg(not(all(feature = "jemalloc", unix)))]
312const fn describe_memory_stats() {}
313
314#[cfg(target_os = "linux")]
315fn describe_io_stats() {
316 use metrics::describe_counter;
317
318 describe_counter!("io.rchar", "Characters read");
319 describe_counter!("io.wchar", "Characters written");
320 describe_counter!("io.syscr", "Read syscalls");
321 describe_counter!("io.syscw", "Write syscalls");
322 describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
323 describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
324 describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
325}
326
327#[cfg(not(target_os = "linux"))]
328const fn describe_io_stats() {}
329
330async fn handle_request(
331 path: &str,
332 hook: impl Fn(),
333 handle: &crate::recorder::PrometheusRecorder,
334 pprof_dump_dir: &PathBuf,
335) -> Response<Full<Bytes>> {
336 match path {
337 "/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
338 "/debug/tokio/dump" => handle_tokio_dump().await,
339 _ => {
340 hook();
341 let metrics = handle.handle().render();
342 let mut response = Response::new(Full::new(Bytes::from(metrics)));
343 response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
344 response
345 }
346 }
347}
348
349#[cfg(all(feature = "jemalloc-prof", unix))]
350fn handle_pprof_heap(pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
351 use http::header::CONTENT_ENCODING;
352
353 match jemalloc_pprof::PROF_CTL.as_ref() {
354 Some(prof_ctl) => match prof_ctl.try_lock() {
355 Ok(_) => match jemalloc_pprof_dump(pprof_dump_dir) {
356 Ok(pprof) => {
357 let mut response = Response::new(Full::new(Bytes::from(pprof)));
358 response
359 .headers_mut()
360 .insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
361 response
362 .headers_mut()
363 .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
364 response
365 }
366 Err(err) => {
367 let mut response = Response::new(Full::new(Bytes::from(format!(
368 "Failed to dump pprof: {err}"
369 ))));
370 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
371 response
372 }
373 },
374 Err(_) => {
375 let mut response = Response::new(Full::new(Bytes::from_static(
376 b"Profile dump already in progress. Try again later.",
377 )));
378 *response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
379 response
380 }
381 },
382 None => {
383 let mut response = Response::new(Full::new(Bytes::from_static(
384 b"jemalloc profiling not enabled. \
385 Set MALLOC_CONF=prof:true or rebuild with jemalloc-prof feature.",
386 )));
387 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
388 response
389 }
390 }
391}
392
393#[cfg(all(feature = "jemalloc-prof", unix))]
396fn jemalloc_pprof_dump(pprof_dump_dir: &PathBuf) -> eyre::Result<Vec<u8>> {
397 use std::{ffi::CString, io::BufReader};
398
399 use mappings::MAPPINGS;
400 use pprof_util::parse_jeheap;
401 use tempfile::NamedTempFile;
402
403 reth_fs_util::create_dir_all(pprof_dump_dir)?;
404 let f = NamedTempFile::new_in(pprof_dump_dir)?;
405 let path = CString::new(f.path().as_os_str().as_encoded_bytes()).unwrap();
406
407 unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", path.as_ptr()) }?;
410
411 let dump_reader = BufReader::new(f);
412 let profile =
413 parse_jeheap(dump_reader, MAPPINGS.as_deref()).map_err(|err| eyre::eyre!(Box::new(err)))?;
414 let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
415
416 Ok(pprof)
417}
418
419#[cfg(not(all(feature = "jemalloc-prof", unix)))]
420fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
421 let mut response = Response::new(Full::new(Bytes::from_static(
422 b"jemalloc pprof support not compiled. Rebuild with the jemalloc-prof feature.",
423 )));
424 *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
425 response
426}
427
428#[cfg(tokio_unstable)]
429async fn handle_tokio_dump() -> Response<Full<Bytes>> {
430 let handle = tokio::runtime::Handle::current();
431 let dump = handle.dump().await;
432
433 let mut output = String::new();
434 for (i, task) in dump.tasks().iter().enumerate() {
435 let trace = task.trace();
436 output.push_str(&format!("task {i}:\n{trace}\n\n"));
437 }
438
439 let mut response = Response::new(Full::new(Bytes::from(output)));
440 response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
441 response
442}
443
444#[cfg(not(tokio_unstable))]
445async fn handle_tokio_dump() -> Response<Full<Bytes>> {
446 let mut response = Response::new(Full::new(Bytes::from_static(
447 b"tokio task dump not available. Rebuild with RUSTFLAGS=\"--cfg tokio_unstable\" and tokio's `taskdump` feature.",
448 )));
449 *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
450 response
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use reqwest::Client;
457 use reth_tasks::Runtime;
458 use socket2::{Domain, Socket, Type};
459 use std::net::{SocketAddr, TcpListener};
460
461 fn get_random_available_addr() -> SocketAddr {
462 let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
463 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
464 socket.set_reuse_address(true).unwrap();
465 socket.bind(addr).unwrap();
466 socket.listen(1).unwrap();
467 let listener = TcpListener::from(socket);
468 listener.local_addr().unwrap()
469 }
470
471 #[tokio::test(flavor = "multi_thread")]
472 async fn test_metrics_endpoint() {
473 install_prometheus_recorder();
476
477 let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
478 let storage_settings_info = StorageSettingsInfo {
479 storage_v2: true,
480 pruning_mode: "archive",
481 prune_config: r#"{"block_interval":5}"#.to_string(),
482 };
483 let version_info = VersionInfo {
484 version: "test",
485 build_timestamp: "test",
486 cargo_features: "test",
487 git_sha: "test",
488 target_triple: "test",
489 build_profile: "test",
490 };
491
492 let runtime = Runtime::test();
493
494 let hooks = Hooks::builder().build();
495
496 let listen_addr = get_random_available_addr();
497 let config = MetricServerConfig::new(
498 listen_addr,
499 version_info,
500 chain_spec_info,
501 runtime.clone(),
502 hooks,
503 std::env::temp_dir(),
504 )
505 .with_storage_settings_info(storage_settings_info);
506
507 MetricServer::new(config).serve().await.unwrap();
508
509 let url = format!("http://{listen_addr}");
511 let response = Client::new().get(&url).send().await.unwrap();
512 assert!(response.status().is_success());
513
514 let body = response.text().await.unwrap();
516 assert!(body.contains("reth_process_cpu_seconds_total"));
517 assert!(body.contains("reth_process_start_time_seconds"));
518 assert!(body.contains("process_cli_args"), "expected process_cli_args metric in output");
519 assert!(body.contains("reth_storage_settings"), "expected storage settings metric");
520 assert!(body.contains("storage_v2=\"true\""), "expected storage v2 label");
521 assert!(body.contains("pruning_mode=\"archive\""), "expected pruning mode label");
522 assert!(body.contains("prune_config="), "expected prune config label");
523
524 drop(runtime);
526 }
527}