1use crate::{
2 chain::ChainSpecInfo,
3 hooks::{Hook, Hooks},
4 recorder::install_prometheus_recorder,
5 version::VersionInfo,
6};
7use bytes::Bytes;
8use eyre::WrapErr;
9use http::{header::CONTENT_TYPE, HeaderValue, Request, Response, StatusCode};
10use http_body_util::Full;
11use metrics::describe_gauge;
12use metrics_process::Collector;
13use reqwest::Client;
14use reth_metrics::metrics::Unit;
15use reth_tasks::TaskExecutor;
16use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
17
18#[derive(Debug)]
20pub struct MetricServerConfig {
21 listen_addr: SocketAddr,
22 version_info: VersionInfo,
23 chain_spec_info: ChainSpecInfo,
24 task_executor: TaskExecutor,
25 hooks: Hooks,
26 push_gateway_url: Option<String>,
27 push_gateway_interval: Duration,
28 pprof_dump_dir: PathBuf,
29}
30
31impl MetricServerConfig {
32 pub const fn new(
34 listen_addr: SocketAddr,
35 version_info: VersionInfo,
36 chain_spec_info: ChainSpecInfo,
37 task_executor: TaskExecutor,
38 hooks: Hooks,
39 pprof_dump_dir: PathBuf,
40 ) -> Self {
41 Self {
42 listen_addr,
43 hooks,
44 task_executor,
45 version_info,
46 chain_spec_info,
47 push_gateway_url: None,
48 push_gateway_interval: Duration::from_secs(5),
49 pprof_dump_dir,
50 }
51 }
52
53 pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
55 self.push_gateway_url = url;
56 self.push_gateway_interval = interval;
57 self
58 }
59}
60
61#[derive(Debug)]
63pub struct MetricServer {
64 config: MetricServerConfig,
65}
66
67impl MetricServer {
68 pub const fn new(config: MetricServerConfig) -> Self {
70 Self { config }
71 }
72
73 pub async fn serve(&self) -> eyre::Result<()> {
75 let MetricServerConfig {
76 listen_addr,
77 hooks,
78 task_executor,
79 version_info,
80 chain_spec_info,
81 push_gateway_url,
82 push_gateway_interval,
83 pprof_dump_dir,
84 } = &self.config;
85
86 let hooks_for_endpoint = hooks.clone();
87 self.start_endpoint(
88 *listen_addr,
89 Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
90 task_executor.clone(),
91 pprof_dump_dir.clone(),
92 )
93 .await
94 .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
95
96 if let Some(url) = push_gateway_url {
98 self.start_push_gateway_task(
99 url.clone(),
100 *push_gateway_interval,
101 hooks.clone(),
102 task_executor.clone(),
103 )?;
104 }
105
106 describe_db_metrics();
108 describe_static_file_metrics();
109 describe_rocksdb_metrics();
110 Collector::default().describe();
111 describe_memory_stats();
112 describe_io_stats();
113
114 version_info.register_version_metrics();
115 chain_spec_info.register_chain_spec_metrics();
116
117 Ok(())
118 }
119
120 async fn start_endpoint<F: Hook + 'static>(
121 &self,
122 listen_addr: SocketAddr,
123 hook: Arc<F>,
124 task_executor: TaskExecutor,
125 pprof_dump_dir: PathBuf,
126 ) -> eyre::Result<()> {
127 let listener = tokio::net::TcpListener::bind(listen_addr)
128 .await
129 .wrap_err("Could not bind to address")?;
130
131 tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
132
133 task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
134 let io = tokio::select! {
135 _ = &mut signal => break,
136 io = listener.accept() => {
137 match io {
138 Ok((stream, _remote_addr)) => stream,
139 Err(err) => {
140 tracing::error!(%err, "failed to accept connection");
141 continue;
142 }
143 }
144 }
145 };
146
147 let handle = install_prometheus_recorder();
148 let hook = hook.clone();
149 let pprof_dump_dir = pprof_dump_dir.clone();
150 let service = tower::service_fn(move |req: Request<_>| {
151 let hook = hook.clone();
152 let pprof_dump_dir = pprof_dump_dir.clone();
153 async move {
154 let response =
155 handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir).await;
156 Ok::<_, Infallible>(response)
157 }
158 });
159
160 let mut shutdown = signal.clone().ignore_guard();
161 tokio::task::spawn(async move {
162 let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
163 .await
164 .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
165 });
166 });
167
168 Ok(())
169 }
170
171 fn start_push_gateway_task(
173 &self,
174 url: String,
175 interval: Duration,
176 hooks: Hooks,
177 task_executor: TaskExecutor,
178 ) -> eyre::Result<()> {
179 let client = Client::builder()
180 .build()
181 .wrap_err("Could not create HTTP client to push metrics to gateway")?;
182 task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
183 tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
184 let handle = install_prometheus_recorder();
185 loop {
186 tokio::select! {
187 _ = &mut signal => {
188 tracing::info!("Shutting down task to push metrics to gateway");
189 break;
190 }
191 _ = tokio::time::sleep(interval) => {
192 hooks.iter().for_each(|hook| hook());
193 let metrics = handle.handle().render();
194 match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
195 Ok(response) => {
196 if !response.status().is_success() {
197 tracing::warn!(
198 status = %response.status(),
199 "Failed to push metrics to gateway"
200 );
201 }
202 }
203 Err(err) => {
204 tracing::warn!(%err, "Failed to push metrics to gateway");
205 }
206 }
207 }
208 }
209 }
210 });
211 Ok(())
212 }
213}
214
215fn describe_db_metrics() {
216 describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
217 describe_gauge!("db.table_pages", "The number of database pages for a table");
218 describe_gauge!("db.table_entries", "The number of entries for a table");
219 describe_gauge!("db.freelist", "The number of pages on the freelist");
220 describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
221 describe_gauge!(
222 "db.timed_out_not_aborted_transactions",
223 "Number of timed out transactions that were not aborted by the user yet"
224 );
225}
226
227fn describe_static_file_metrics() {
228 describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
229 describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
230 describe_gauge!(
231 "static_files.segment_entries",
232 "The number of entries for a static file segment"
233 );
234}
235
236fn describe_rocksdb_metrics() {
237 describe_gauge!(
238 "rocksdb.table_size",
239 Unit::Bytes,
240 "The estimated size of a RocksDB table (SST + memtable)"
241 );
242 describe_gauge!("rocksdb.table_entries", "The estimated number of keys in a RocksDB table");
243 describe_gauge!(
244 "rocksdb.pending_compaction_bytes",
245 Unit::Bytes,
246 "Bytes pending compaction for a RocksDB table"
247 );
248 describe_gauge!("rocksdb.sst_size", Unit::Bytes, "The size of SST files for a RocksDB table");
249 describe_gauge!(
250 "rocksdb.memtable_size",
251 Unit::Bytes,
252 "The size of memtables for a RocksDB table"
253 );
254 describe_gauge!(
255 "rocksdb.wal_size",
256 Unit::Bytes,
257 "The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
258 );
259}
260
261#[cfg(all(feature = "jemalloc", unix))]
262fn describe_memory_stats() {
263 describe_gauge!(
264 "jemalloc.active",
265 Unit::Bytes,
266 "Total number of bytes in active pages allocated by the application"
267 );
268 describe_gauge!(
269 "jemalloc.allocated",
270 Unit::Bytes,
271 "Total number of bytes allocated by the application"
272 );
273 describe_gauge!(
274 "jemalloc.mapped",
275 Unit::Bytes,
276 "Total number of bytes in active extents mapped by the allocator"
277 );
278 describe_gauge!(
279 "jemalloc.metadata",
280 Unit::Bytes,
281 "Total number of bytes dedicated to jemalloc metadata"
282 );
283 describe_gauge!(
284 "jemalloc.resident",
285 Unit::Bytes,
286 "Total number of bytes in physically resident data pages mapped by the allocator"
287 );
288 describe_gauge!(
289 "jemalloc.retained",
290 Unit::Bytes,
291 "Total number of bytes in virtual memory mappings that were retained rather than \
292 being returned to the operating system via e.g. munmap(2)"
293 );
294}
295
296#[cfg(not(all(feature = "jemalloc", unix)))]
297const fn describe_memory_stats() {}
298
299#[cfg(target_os = "linux")]
300fn describe_io_stats() {
301 use metrics::describe_counter;
302
303 describe_counter!("io.rchar", "Characters read");
304 describe_counter!("io.wchar", "Characters written");
305 describe_counter!("io.syscr", "Read syscalls");
306 describe_counter!("io.syscw", "Write syscalls");
307 describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
308 describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
309 describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
310}
311
312#[cfg(not(target_os = "linux"))]
313const fn describe_io_stats() {}
314
315async fn handle_request(
316 path: &str,
317 hook: impl Fn(),
318 handle: &crate::recorder::PrometheusRecorder,
319 pprof_dump_dir: &PathBuf,
320) -> Response<Full<Bytes>> {
321 match path {
322 "/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
323 "/debug/tokio/dump" => handle_tokio_dump().await,
324 _ => {
325 hook();
326 let metrics = handle.handle().render();
327 let mut response = Response::new(Full::new(Bytes::from(metrics)));
328 response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
329 response
330 }
331 }
332}
333
334#[cfg(all(feature = "jemalloc-prof", unix))]
335fn handle_pprof_heap(pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
336 use http::header::CONTENT_ENCODING;
337
338 match jemalloc_pprof::PROF_CTL.as_ref() {
339 Some(prof_ctl) => match prof_ctl.try_lock() {
340 Ok(_) => match jemalloc_pprof_dump(pprof_dump_dir) {
341 Ok(pprof) => {
342 let mut response = Response::new(Full::new(Bytes::from(pprof)));
343 response
344 .headers_mut()
345 .insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
346 response
347 .headers_mut()
348 .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
349 response
350 }
351 Err(err) => {
352 let mut response = Response::new(Full::new(Bytes::from(format!(
353 "Failed to dump pprof: {err}"
354 ))));
355 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
356 response
357 }
358 },
359 Err(_) => {
360 let mut response = Response::new(Full::new(Bytes::from_static(
361 b"Profile dump already in progress. Try again later.",
362 )));
363 *response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
364 response
365 }
366 },
367 None => {
368 let mut response = Response::new(Full::new(Bytes::from_static(
369 b"jemalloc profiling not enabled. \
370 Set MALLOC_CONF=prof:true or rebuild with jemalloc-prof feature.",
371 )));
372 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
373 response
374 }
375 }
376}
377
378#[cfg(all(feature = "jemalloc-prof", unix))]
381fn jemalloc_pprof_dump(pprof_dump_dir: &PathBuf) -> eyre::Result<Vec<u8>> {
382 use std::{ffi::CString, io::BufReader};
383
384 use mappings::MAPPINGS;
385 use pprof_util::parse_jeheap;
386 use tempfile::NamedTempFile;
387
388 reth_fs_util::create_dir_all(pprof_dump_dir)?;
389 let f = NamedTempFile::new_in(pprof_dump_dir)?;
390 let path = CString::new(f.path().as_os_str().as_encoded_bytes()).unwrap();
391
392 unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", path.as_ptr()) }?;
395
396 let dump_reader = BufReader::new(f);
397 let profile =
398 parse_jeheap(dump_reader, MAPPINGS.as_deref()).map_err(|err| eyre::eyre!(Box::new(err)))?;
399 let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
400
401 Ok(pprof)
402}
403
404#[cfg(not(all(feature = "jemalloc-prof", unix)))]
405fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
406 let mut response = Response::new(Full::new(Bytes::from_static(
407 b"jemalloc pprof support not compiled. Rebuild with the jemalloc-prof feature.",
408 )));
409 *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
410 response
411}
412
413#[cfg(tokio_unstable)]
414async fn handle_tokio_dump() -> Response<Full<Bytes>> {
415 let handle = tokio::runtime::Handle::current();
416 let dump = handle.dump().await;
417
418 let mut output = String::new();
419 for (i, task) in dump.tasks().iter().enumerate() {
420 let trace = task.trace();
421 output.push_str(&format!("task {i}:\n{trace}\n\n"));
422 }
423
424 let mut response = Response::new(Full::new(Bytes::from(output)));
425 response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
426 response
427}
428
429#[cfg(not(tokio_unstable))]
430async fn handle_tokio_dump() -> Response<Full<Bytes>> {
431 let mut response = Response::new(Full::new(Bytes::from_static(
432 b"tokio task dump not available. Rebuild with RUSTFLAGS=\"--cfg tokio_unstable\" and tokio's `taskdump` feature.",
433 )));
434 *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
435 response
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use reqwest::Client;
442 use reth_tasks::Runtime;
443 use socket2::{Domain, Socket, Type};
444 use std::net::{SocketAddr, TcpListener};
445
446 fn get_random_available_addr() -> SocketAddr {
447 let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
448 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
449 socket.set_reuse_address(true).unwrap();
450 socket.bind(addr).unwrap();
451 socket.listen(1).unwrap();
452 let listener = TcpListener::from(socket);
453 listener.local_addr().unwrap()
454 }
455
456 #[tokio::test(flavor = "multi_thread")]
457 async fn test_metrics_endpoint() {
458 let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
459 let version_info = VersionInfo {
460 version: "test",
461 build_timestamp: "test",
462 cargo_features: "test",
463 git_sha: "test",
464 target_triple: "test",
465 build_profile: "test",
466 };
467
468 let runtime = Runtime::test();
469
470 let hooks = Hooks::builder().build();
471
472 let listen_addr = get_random_available_addr();
473 let config = MetricServerConfig::new(
474 listen_addr,
475 version_info,
476 chain_spec_info,
477 runtime.clone(),
478 hooks,
479 std::env::temp_dir(),
480 );
481
482 MetricServer::new(config).serve().await.unwrap();
483
484 let url = format!("http://{listen_addr}");
486 let response = Client::new().get(&url).send().await.unwrap();
487 assert!(response.status().is_success());
488
489 let body = response.text().await.unwrap();
491 assert!(body.contains("reth_process_cpu_seconds_total"));
492 assert!(body.contains("reth_process_start_time_seconds"));
493
494 drop(runtime);
496 }
497}