Skip to main content

reth_node_metrics/
server.rs

1use crate::{
2    chain::ChainSpecInfo,
3    hooks::{Hook, Hooks},
4    process::register_process_metrics,
5    recorder::install_prometheus_recorder,
6    version::VersionInfo,
7};
8use bytes::Bytes;
9use eyre::WrapErr;
10use http::{header::CONTENT_TYPE, HeaderValue, Request, Response, StatusCode};
11use http_body_util::Full;
12use metrics::describe_gauge;
13use metrics_process::Collector;
14use reqwest::Client;
15use reth_metrics::metrics::Unit;
16use reth_tasks::TaskExecutor;
17use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
18
19/// Configuration for the [`MetricServer`]
20#[derive(Debug)]
21pub struct MetricServerConfig {
22    listen_addr: SocketAddr,
23    version_info: VersionInfo,
24    chain_spec_info: ChainSpecInfo,
25    task_executor: TaskExecutor,
26    hooks: Hooks,
27    push_gateway_url: Option<String>,
28    push_gateway_interval: Duration,
29    pprof_dump_dir: PathBuf,
30}
31
32impl MetricServerConfig {
33    /// Create a new [`MetricServerConfig`] with the given configuration
34    pub const fn new(
35        listen_addr: SocketAddr,
36        version_info: VersionInfo,
37        chain_spec_info: ChainSpecInfo,
38        task_executor: TaskExecutor,
39        hooks: Hooks,
40        pprof_dump_dir: PathBuf,
41    ) -> Self {
42        Self {
43            listen_addr,
44            hooks,
45            task_executor,
46            version_info,
47            chain_spec_info,
48            push_gateway_url: None,
49            push_gateway_interval: Duration::from_secs(5),
50            pprof_dump_dir,
51        }
52    }
53
54    /// Set the gateway URL and interval for pushing metrics
55    pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
56        self.push_gateway_url = url;
57        self.push_gateway_interval = interval;
58        self
59    }
60}
61
62/// [`MetricServer`] responsible for serving the metrics endpoint
63#[derive(Debug)]
64pub struct MetricServer {
65    config: MetricServerConfig,
66}
67
68impl MetricServer {
69    /// Create a new [`MetricServer`] with the given configuration
70    pub const fn new(config: MetricServerConfig) -> Self {
71        Self { config }
72    }
73
74    /// Spawns the metrics server
75    pub async fn serve(&self) -> eyre::Result<()> {
76        let MetricServerConfig {
77            listen_addr,
78            hooks,
79            task_executor,
80            version_info,
81            chain_spec_info,
82            push_gateway_url,
83            push_gateway_interval,
84            pprof_dump_dir,
85        } = &self.config;
86
87        let hooks_for_endpoint = hooks.clone();
88        self.start_endpoint(
89            *listen_addr,
90            Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
91            task_executor.clone(),
92            pprof_dump_dir.clone(),
93        )
94        .await
95        .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
96
97        // Start push-gateway task if configured
98        if let Some(url) = push_gateway_url {
99            self.start_push_gateway_task(
100                url.clone(),
101                *push_gateway_interval,
102                hooks.clone(),
103                task_executor.clone(),
104            )?;
105        }
106
107        // Describe metrics after recorder installation
108        describe_db_metrics();
109        describe_static_file_metrics();
110        describe_rocksdb_metrics();
111        Collector::default().describe();
112        describe_memory_stats();
113        describe_io_stats();
114
115        version_info.register_version_metrics();
116        chain_spec_info.register_chain_spec_metrics();
117        register_process_metrics();
118
119        Ok(())
120    }
121
122    async fn start_endpoint<F: Hook + 'static>(
123        &self,
124        listen_addr: SocketAddr,
125        hook: Arc<F>,
126        task_executor: TaskExecutor,
127        pprof_dump_dir: PathBuf,
128    ) -> eyre::Result<()> {
129        let listener = tokio::net::TcpListener::bind(listen_addr)
130            .await
131            .wrap_err("Could not bind to address")?;
132
133        tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
134
135        task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
136            let io = tokio::select! {
137                _ = &mut signal => break,
138                io = listener.accept() => {
139                    match io {
140                        Ok((stream, _remote_addr)) => stream,
141                        Err(err) => {
142                            tracing::error!(%err, "failed to accept connection");
143                            continue;
144                        }
145                    }
146                }
147            };
148
149            let handle = install_prometheus_recorder();
150            let hook = hook.clone();
151            let pprof_dump_dir = pprof_dump_dir.clone();
152            let service = tower::service_fn(move |req: Request<_>| {
153                let hook = hook.clone();
154                let pprof_dump_dir = pprof_dump_dir.clone();
155                async move {
156                    let response =
157                        handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir).await;
158                    Ok::<_, Infallible>(response)
159                }
160            });
161
162            let mut shutdown = signal.clone().ignore_guard();
163            tokio::task::spawn(async move {
164                let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
165                    .await
166                    .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
167            });
168        });
169
170        Ok(())
171    }
172
173    /// Starts a background task to push metrics to a metrics gateway
174    fn start_push_gateway_task(
175        &self,
176        url: String,
177        interval: Duration,
178        hooks: Hooks,
179        task_executor: TaskExecutor,
180    ) -> eyre::Result<()> {
181        let client = Client::builder()
182            .build()
183            .wrap_err("Could not create HTTP client to push metrics to gateway")?;
184        task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
185            tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
186            let handle = install_prometheus_recorder();
187            loop {
188                tokio::select! {
189                    _ = &mut signal => {
190                        tracing::info!("Shutting down task to push metrics to gateway");
191                        break;
192                    }
193                    _ = tokio::time::sleep(interval) => {
194                        hooks.iter().for_each(|hook| hook());
195                        let metrics = handle.handle().render();
196                        match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
197                            Ok(response) => {
198                                if !response.status().is_success() {
199                                    tracing::warn!(
200                                        status = %response.status(),
201                                        "Failed to push metrics to gateway"
202                                    );
203                                }
204                            }
205                            Err(err) => {
206                                tracing::warn!(%err, "Failed to push metrics to gateway");
207                            }
208                        }
209                    }
210                }
211            }
212        });
213        Ok(())
214    }
215}
216
217fn describe_db_metrics() {
218    describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
219    describe_gauge!("db.table_pages", "The number of database pages for a table");
220    describe_gauge!("db.table_entries", "The number of entries for a table");
221    describe_gauge!("db.freelist", "The number of pages on the freelist");
222    describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
223    describe_gauge!(
224        "db.timed_out_not_aborted_transactions",
225        "Number of timed out transactions that were not aborted by the user yet"
226    );
227}
228
229fn describe_static_file_metrics() {
230    describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
231    describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
232    describe_gauge!(
233        "static_files.segment_entries",
234        "The number of entries for a static file segment"
235    );
236}
237
238fn describe_rocksdb_metrics() {
239    describe_gauge!(
240        "rocksdb.table_size",
241        Unit::Bytes,
242        "The estimated size of a RocksDB table (SST + memtable)"
243    );
244    describe_gauge!("rocksdb.table_entries", "The estimated number of keys in a RocksDB table");
245    describe_gauge!(
246        "rocksdb.pending_compaction_bytes",
247        Unit::Bytes,
248        "Bytes pending compaction for a RocksDB table"
249    );
250    describe_gauge!("rocksdb.sst_size", Unit::Bytes, "The size of SST files for a RocksDB table");
251    describe_gauge!(
252        "rocksdb.memtable_size",
253        Unit::Bytes,
254        "The size of memtables for a RocksDB table"
255    );
256    describe_gauge!(
257        "rocksdb.wal_size",
258        Unit::Bytes,
259        "The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
260    );
261}
262
263#[cfg(all(feature = "jemalloc", unix))]
264fn describe_memory_stats() {
265    describe_gauge!(
266        "jemalloc.active",
267        Unit::Bytes,
268        "Total number of bytes in active pages allocated by the application"
269    );
270    describe_gauge!(
271        "jemalloc.allocated",
272        Unit::Bytes,
273        "Total number of bytes allocated by the application"
274    );
275    describe_gauge!(
276        "jemalloc.mapped",
277        Unit::Bytes,
278        "Total number of bytes in active extents mapped by the allocator"
279    );
280    describe_gauge!(
281        "jemalloc.metadata",
282        Unit::Bytes,
283        "Total number of bytes dedicated to jemalloc metadata"
284    );
285    describe_gauge!(
286        "jemalloc.resident",
287        Unit::Bytes,
288        "Total number of bytes in physically resident data pages mapped by the allocator"
289    );
290    describe_gauge!(
291        "jemalloc.retained",
292        Unit::Bytes,
293        "Total number of bytes in virtual memory mappings that were retained rather than \
294        being returned to the operating system via e.g. munmap(2)"
295    );
296}
297
298#[cfg(not(all(feature = "jemalloc", unix)))]
299const fn describe_memory_stats() {}
300
301#[cfg(target_os = "linux")]
302fn describe_io_stats() {
303    use metrics::describe_counter;
304
305    describe_counter!("io.rchar", "Characters read");
306    describe_counter!("io.wchar", "Characters written");
307    describe_counter!("io.syscr", "Read syscalls");
308    describe_counter!("io.syscw", "Write syscalls");
309    describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
310    describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
311    describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
312}
313
314#[cfg(not(target_os = "linux"))]
315const fn describe_io_stats() {}
316
317async fn handle_request(
318    path: &str,
319    hook: impl Fn(),
320    handle: &crate::recorder::PrometheusRecorder,
321    pprof_dump_dir: &PathBuf,
322) -> Response<Full<Bytes>> {
323    match path {
324        "/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
325        "/debug/tokio/dump" => handle_tokio_dump().await,
326        _ => {
327            hook();
328            let metrics = handle.handle().render();
329            let mut response = Response::new(Full::new(Bytes::from(metrics)));
330            response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
331            response
332        }
333    }
334}
335
336#[cfg(all(feature = "jemalloc-prof", unix))]
337fn handle_pprof_heap(pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
338    use http::header::CONTENT_ENCODING;
339
340    match jemalloc_pprof::PROF_CTL.as_ref() {
341        Some(prof_ctl) => match prof_ctl.try_lock() {
342            Ok(_) => match jemalloc_pprof_dump(pprof_dump_dir) {
343                Ok(pprof) => {
344                    let mut response = Response::new(Full::new(Bytes::from(pprof)));
345                    response
346                        .headers_mut()
347                        .insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
348                    response
349                        .headers_mut()
350                        .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
351                    response
352                }
353                Err(err) => {
354                    let mut response = Response::new(Full::new(Bytes::from(format!(
355                        "Failed to dump pprof: {err}"
356                    ))));
357                    *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
358                    response
359                }
360            },
361            Err(_) => {
362                let mut response = Response::new(Full::new(Bytes::from_static(
363                    b"Profile dump already in progress. Try again later.",
364                )));
365                *response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
366                response
367            }
368        },
369        None => {
370            let mut response = Response::new(Full::new(Bytes::from_static(
371                b"jemalloc profiling not enabled. \
372                 Set MALLOC_CONF=prof:true or rebuild with jemalloc-prof feature.",
373            )));
374            *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
375            response
376        }
377    }
378}
379
380/// Equivalent to [`jemalloc_pprof::JemallocProfCtl::dump`], but accepts a directory that the
381/// temporary pprof file will be written to. The file is deleted when the function exits.
382#[cfg(all(feature = "jemalloc-prof", unix))]
383fn jemalloc_pprof_dump(pprof_dump_dir: &PathBuf) -> eyre::Result<Vec<u8>> {
384    use std::{ffi::CString, io::BufReader};
385
386    use mappings::MAPPINGS;
387    use pprof_util::parse_jeheap;
388    use tempfile::NamedTempFile;
389
390    reth_fs_util::create_dir_all(pprof_dump_dir)?;
391    let f = NamedTempFile::new_in(pprof_dump_dir)?;
392    let path = CString::new(f.path().as_os_str().as_encoded_bytes()).unwrap();
393
394    // SAFETY: "prof.dump" is documented as being writable and taking a C string as input:
395    // http://jemalloc.net/jemalloc.3.html#prof.dump
396    unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", path.as_ptr()) }?;
397
398    let dump_reader = BufReader::new(f);
399    let profile =
400        parse_jeheap(dump_reader, MAPPINGS.as_deref()).map_err(|err| eyre::eyre!(Box::new(err)))?;
401    let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
402
403    Ok(pprof)
404}
405
406#[cfg(not(all(feature = "jemalloc-prof", unix)))]
407fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
408    let mut response = Response::new(Full::new(Bytes::from_static(
409        b"jemalloc pprof support not compiled. Rebuild with the jemalloc-prof feature.",
410    )));
411    *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
412    response
413}
414
415#[cfg(tokio_unstable)]
416async fn handle_tokio_dump() -> Response<Full<Bytes>> {
417    let handle = tokio::runtime::Handle::current();
418    let dump = handle.dump().await;
419
420    let mut output = String::new();
421    for (i, task) in dump.tasks().iter().enumerate() {
422        let trace = task.trace();
423        output.push_str(&format!("task {i}:\n{trace}\n\n"));
424    }
425
426    let mut response = Response::new(Full::new(Bytes::from(output)));
427    response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
428    response
429}
430
431#[cfg(not(tokio_unstable))]
432async fn handle_tokio_dump() -> Response<Full<Bytes>> {
433    let mut response = Response::new(Full::new(Bytes::from_static(
434        b"tokio task dump not available. Rebuild with RUSTFLAGS=\"--cfg tokio_unstable\" and tokio's `taskdump` feature.",
435    )));
436    *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
437    response
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use reqwest::Client;
444    use reth_tasks::Runtime;
445    use socket2::{Domain, Socket, Type};
446    use std::net::{SocketAddr, TcpListener};
447
448    fn get_random_available_addr() -> SocketAddr {
449        let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
450        let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
451        socket.set_reuse_address(true).unwrap();
452        socket.bind(addr).unwrap();
453        socket.listen(1).unwrap();
454        let listener = TcpListener::from(socket);
455        listener.local_addr().unwrap()
456    }
457
458    #[tokio::test(flavor = "multi_thread")]
459    async fn test_metrics_endpoint() {
460        // Install the recorder before serve() so gauge registrations are captured,
461        // mirroring how start_prometheus_endpoint() works in the real node launch.
462        install_prometheus_recorder();
463
464        let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
465        let version_info = VersionInfo {
466            version: "test",
467            build_timestamp: "test",
468            cargo_features: "test",
469            git_sha: "test",
470            target_triple: "test",
471            build_profile: "test",
472        };
473
474        let runtime = Runtime::test();
475
476        let hooks = Hooks::builder().build();
477
478        let listen_addr = get_random_available_addr();
479        let config = MetricServerConfig::new(
480            listen_addr,
481            version_info,
482            chain_spec_info,
483            runtime.clone(),
484            hooks,
485            std::env::temp_dir(),
486        );
487
488        MetricServer::new(config).serve().await.unwrap();
489
490        // Send request to the metrics endpoint
491        let url = format!("http://{listen_addr}");
492        let response = Client::new().get(&url).send().await.unwrap();
493        assert!(response.status().is_success());
494
495        // Check the response body
496        let body = response.text().await.unwrap();
497        assert!(body.contains("reth_process_cpu_seconds_total"));
498        assert!(body.contains("reth_process_start_time_seconds"));
499        assert!(body.contains("process_cli_args"), "expected process_cli_args metric in output");
500
501        // Make sure the runtime is dropped after the test runs.
502        drop(runtime);
503    }
504}