Skip to main content

reth_node_metrics/
server.rs

1use crate::{
2    chain::ChainSpecInfo,
3    hooks::{Hook, Hooks},
4    process::register_process_metrics,
5    recorder::install_prometheus_recorder,
6    storage::StorageSettingsInfo,
7    version::VersionInfo,
8};
9use bytes::Bytes;
10use eyre::WrapErr;
11use http::{header::CONTENT_TYPE, HeaderValue, Request, Response, StatusCode};
12use http_body_util::Full;
13use metrics::describe_gauge;
14use metrics_process::Collector;
15use reqwest::Client;
16use reth_metrics::metrics::Unit;
17use reth_tasks::TaskExecutor;
18use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
19
20/// Configuration for the [`MetricServer`]
21#[derive(Debug)]
22pub struct MetricServerConfig {
23    listen_addr: SocketAddr,
24    version_info: VersionInfo,
25    chain_spec_info: ChainSpecInfo,
26    storage_settings_info: Option<StorageSettingsInfo>,
27    task_executor: TaskExecutor,
28    hooks: Hooks,
29    push_gateway_url: Option<String>,
30    push_gateway_interval: Duration,
31    pprof_dump_dir: PathBuf,
32}
33
34impl MetricServerConfig {
35    /// Create a new [`MetricServerConfig`] with the given configuration
36    pub const fn new(
37        listen_addr: SocketAddr,
38        version_info: VersionInfo,
39        chain_spec_info: ChainSpecInfo,
40        task_executor: TaskExecutor,
41        hooks: Hooks,
42        pprof_dump_dir: PathBuf,
43    ) -> Self {
44        Self {
45            listen_addr,
46            hooks,
47            task_executor,
48            version_info,
49            chain_spec_info,
50            storage_settings_info: None,
51            push_gateway_url: None,
52            push_gateway_interval: Duration::from_secs(5),
53            pprof_dump_dir,
54        }
55    }
56
57    /// Set the storage settings information to expose over prometheus.
58    pub fn with_storage_settings_info(mut self, info: StorageSettingsInfo) -> Self {
59        self.storage_settings_info = Some(info);
60        self
61    }
62
63    /// Set the gateway URL and interval for pushing metrics
64    pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
65        self.push_gateway_url = url;
66        self.push_gateway_interval = interval;
67        self
68    }
69}
70
71/// [`MetricServer`] responsible for serving the metrics endpoint
72#[derive(Debug)]
73pub struct MetricServer {
74    config: MetricServerConfig,
75}
76
77impl MetricServer {
78    /// Create a new [`MetricServer`] with the given configuration
79    pub const fn new(config: MetricServerConfig) -> Self {
80        Self { config }
81    }
82
83    /// Spawns the metrics server
84    pub async fn serve(&self) -> eyre::Result<()> {
85        let MetricServerConfig {
86            listen_addr,
87            hooks,
88            task_executor,
89            version_info,
90            chain_spec_info,
91            storage_settings_info,
92            push_gateway_url,
93            push_gateway_interval,
94            pprof_dump_dir,
95        } = &self.config;
96
97        let hooks_for_endpoint = hooks.clone();
98        self.start_endpoint(
99            *listen_addr,
100            Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
101            task_executor.clone(),
102            pprof_dump_dir.clone(),
103        )
104        .await
105        .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
106
107        // Start push-gateway task if configured
108        if let Some(url) = push_gateway_url {
109            self.start_push_gateway_task(
110                url.clone(),
111                *push_gateway_interval,
112                hooks.clone(),
113                task_executor.clone(),
114            )?;
115        }
116
117        // Describe metrics after recorder installation
118        describe_db_metrics();
119        describe_static_file_metrics();
120        describe_rocksdb_metrics();
121        Collector::default().describe();
122        describe_memory_stats();
123        describe_io_stats();
124
125        version_info.register_version_metrics();
126        chain_spec_info.register_chain_spec_metrics();
127        if let Some(storage_settings_info) = storage_settings_info {
128            storage_settings_info.register_storage_settings_metrics();
129        }
130        register_process_metrics();
131
132        Ok(())
133    }
134
135    async fn start_endpoint<F: Hook + 'static>(
136        &self,
137        listen_addr: SocketAddr,
138        hook: Arc<F>,
139        task_executor: TaskExecutor,
140        pprof_dump_dir: PathBuf,
141    ) -> eyre::Result<()> {
142        let listener = tokio::net::TcpListener::bind(listen_addr)
143            .await
144            .wrap_err("Could not bind to address")?;
145
146        tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
147
148        task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
149            let io = tokio::select! {
150                _ = &mut signal => break,
151                io = listener.accept() => {
152                    match io {
153                        Ok((stream, _remote_addr)) => stream,
154                        Err(err) => {
155                            tracing::error!(%err, "failed to accept connection");
156                            continue;
157                        }
158                    }
159                }
160            };
161
162            let handle = install_prometheus_recorder();
163            let hook = hook.clone();
164            let pprof_dump_dir = pprof_dump_dir.clone();
165            let service = tower::service_fn(move |req: Request<_>| {
166                let hook = hook.clone();
167                let pprof_dump_dir = pprof_dump_dir.clone();
168                async move {
169                    let response =
170                        handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir).await;
171                    Ok::<_, Infallible>(response)
172                }
173            });
174
175            let mut shutdown = signal.clone().ignore_guard();
176            tokio::task::spawn(async move {
177                let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
178                    .await
179                    .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
180            });
181        });
182
183        Ok(())
184    }
185
186    /// Starts a background task to push metrics to a metrics gateway
187    fn start_push_gateway_task(
188        &self,
189        url: String,
190        interval: Duration,
191        hooks: Hooks,
192        task_executor: TaskExecutor,
193    ) -> eyre::Result<()> {
194        let client = Client::builder()
195            .build()
196            .wrap_err("Could not create HTTP client to push metrics to gateway")?;
197        task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
198            tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
199            let handle = install_prometheus_recorder();
200            loop {
201                tokio::select! {
202                    _ = &mut signal => {
203                        tracing::info!("Shutting down task to push metrics to gateway");
204                        break;
205                    }
206                    _ = tokio::time::sleep(interval) => {
207                        hooks.iter().for_each(|hook| hook());
208                        let metrics = handle.handle().render();
209                        match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
210                            Ok(response) => {
211                                if !response.status().is_success() {
212                                    tracing::warn!(
213                                        status = %response.status(),
214                                        "Failed to push metrics to gateway"
215                                    );
216                                }
217                            }
218                            Err(err) => {
219                                tracing::warn!(%err, "Failed to push metrics to gateway");
220                            }
221                        }
222                    }
223                }
224            }
225        });
226        Ok(())
227    }
228}
229
230fn describe_db_metrics() {
231    describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
232    describe_gauge!("db.table_pages", "The number of database pages for a table");
233    describe_gauge!("db.table_entries", "The number of entries for a table");
234    describe_gauge!("db.freelist", "The number of pages on the freelist");
235    describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
236    describe_gauge!(
237        "db.timed_out_not_aborted_transactions",
238        "Number of timed out transactions that were not aborted by the user yet"
239    );
240}
241
242fn describe_static_file_metrics() {
243    describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
244    describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
245    describe_gauge!(
246        "static_files.segment_entries",
247        "The number of entries for a static file segment"
248    );
249}
250
251fn describe_rocksdb_metrics() {
252    describe_gauge!(
253        "rocksdb.table_size",
254        Unit::Bytes,
255        "The estimated size of a RocksDB table (SST + memtable)"
256    );
257    describe_gauge!("rocksdb.table_entries", "The estimated number of keys in a RocksDB table");
258    describe_gauge!(
259        "rocksdb.pending_compaction_bytes",
260        Unit::Bytes,
261        "Bytes pending compaction for a RocksDB table"
262    );
263    describe_gauge!("rocksdb.sst_size", Unit::Bytes, "The size of SST files for a RocksDB table");
264    describe_gauge!(
265        "rocksdb.memtable_size",
266        Unit::Bytes,
267        "The size of memtables for a RocksDB table"
268    );
269    describe_gauge!(
270        "rocksdb.wal_size",
271        Unit::Bytes,
272        "The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
273    );
274}
275
276#[cfg(all(feature = "jemalloc", unix))]
277fn describe_memory_stats() {
278    describe_gauge!(
279        "jemalloc.active",
280        Unit::Bytes,
281        "Total number of bytes in active pages allocated by the application"
282    );
283    describe_gauge!(
284        "jemalloc.allocated",
285        Unit::Bytes,
286        "Total number of bytes allocated by the application"
287    );
288    describe_gauge!(
289        "jemalloc.mapped",
290        Unit::Bytes,
291        "Total number of bytes in active extents mapped by the allocator"
292    );
293    describe_gauge!(
294        "jemalloc.metadata",
295        Unit::Bytes,
296        "Total number of bytes dedicated to jemalloc metadata"
297    );
298    describe_gauge!(
299        "jemalloc.resident",
300        Unit::Bytes,
301        "Total number of bytes in physically resident data pages mapped by the allocator"
302    );
303    describe_gauge!(
304        "jemalloc.retained",
305        Unit::Bytes,
306        "Total number of bytes in virtual memory mappings that were retained rather than \
307        being returned to the operating system via e.g. munmap(2)"
308    );
309}
310
311#[cfg(not(all(feature = "jemalloc", unix)))]
312const fn describe_memory_stats() {}
313
314#[cfg(target_os = "linux")]
315fn describe_io_stats() {
316    use metrics::describe_counter;
317
318    describe_counter!("io.rchar", "Characters read");
319    describe_counter!("io.wchar", "Characters written");
320    describe_counter!("io.syscr", "Read syscalls");
321    describe_counter!("io.syscw", "Write syscalls");
322    describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
323    describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
324    describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
325}
326
327#[cfg(not(target_os = "linux"))]
328const fn describe_io_stats() {}
329
330async fn handle_request(
331    path: &str,
332    hook: impl Fn(),
333    handle: &crate::recorder::PrometheusRecorder,
334    pprof_dump_dir: &PathBuf,
335) -> Response<Full<Bytes>> {
336    match path {
337        "/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
338        "/debug/tokio/dump" => handle_tokio_dump().await,
339        _ => {
340            hook();
341            let metrics = handle.handle().render();
342            let mut response = Response::new(Full::new(Bytes::from(metrics)));
343            response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
344            response
345        }
346    }
347}
348
349#[cfg(all(feature = "jemalloc-prof", unix))]
350fn handle_pprof_heap(pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
351    use http::header::CONTENT_ENCODING;
352
353    match jemalloc_pprof::PROF_CTL.as_ref() {
354        Some(prof_ctl) => match prof_ctl.try_lock() {
355            Ok(_) => match jemalloc_pprof_dump(pprof_dump_dir) {
356                Ok(pprof) => {
357                    let mut response = Response::new(Full::new(Bytes::from(pprof)));
358                    response
359                        .headers_mut()
360                        .insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
361                    response
362                        .headers_mut()
363                        .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
364                    response
365                }
366                Err(err) => {
367                    let mut response = Response::new(Full::new(Bytes::from(format!(
368                        "Failed to dump pprof: {err}"
369                    ))));
370                    *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
371                    response
372                }
373            },
374            Err(_) => {
375                let mut response = Response::new(Full::new(Bytes::from_static(
376                    b"Profile dump already in progress. Try again later.",
377                )));
378                *response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
379                response
380            }
381        },
382        None => {
383            let mut response = Response::new(Full::new(Bytes::from_static(
384                b"jemalloc profiling not enabled. \
385                 Set MALLOC_CONF=prof:true or rebuild with jemalloc-prof feature.",
386            )));
387            *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
388            response
389        }
390    }
391}
392
393/// Equivalent to [`jemalloc_pprof::JemallocProfCtl::dump`], but accepts a directory that the
394/// temporary pprof file will be written to. The file is deleted when the function exits.
395#[cfg(all(feature = "jemalloc-prof", unix))]
396fn jemalloc_pprof_dump(pprof_dump_dir: &PathBuf) -> eyre::Result<Vec<u8>> {
397    use std::{ffi::CString, io::BufReader};
398
399    use mappings::MAPPINGS;
400    use pprof_util::parse_jeheap;
401    use tempfile::NamedTempFile;
402
403    reth_fs_util::create_dir_all(pprof_dump_dir)?;
404    let f = NamedTempFile::new_in(pprof_dump_dir)?;
405    let path = CString::new(f.path().as_os_str().as_encoded_bytes()).unwrap();
406
407    // SAFETY: "prof.dump" is documented as being writable and taking a C string as input:
408    // http://jemalloc.net/jemalloc.3.html#prof.dump
409    unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", path.as_ptr()) }?;
410
411    let dump_reader = BufReader::new(f);
412    let profile =
413        parse_jeheap(dump_reader, MAPPINGS.as_deref()).map_err(|err| eyre::eyre!(Box::new(err)))?;
414    let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
415
416    Ok(pprof)
417}
418
419#[cfg(not(all(feature = "jemalloc-prof", unix)))]
420fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
421    let mut response = Response::new(Full::new(Bytes::from_static(
422        b"jemalloc pprof support not compiled. Rebuild with the jemalloc-prof feature.",
423    )));
424    *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
425    response
426}
427
428#[cfg(tokio_unstable)]
429async fn handle_tokio_dump() -> Response<Full<Bytes>> {
430    let handle = tokio::runtime::Handle::current();
431    let dump = handle.dump().await;
432
433    let mut output = String::new();
434    for (i, task) in dump.tasks().iter().enumerate() {
435        let trace = task.trace();
436        output.push_str(&format!("task {i}:\n{trace}\n\n"));
437    }
438
439    let mut response = Response::new(Full::new(Bytes::from(output)));
440    response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
441    response
442}
443
444#[cfg(not(tokio_unstable))]
445async fn handle_tokio_dump() -> Response<Full<Bytes>> {
446    let mut response = Response::new(Full::new(Bytes::from_static(
447        b"tokio task dump not available. Rebuild with RUSTFLAGS=\"--cfg tokio_unstable\" and tokio's `taskdump` feature.",
448    )));
449    *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
450    response
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use reqwest::Client;
457    use reth_tasks::Runtime;
458    use socket2::{Domain, Socket, Type};
459    use std::net::{SocketAddr, TcpListener};
460
461    fn get_random_available_addr() -> SocketAddr {
462        let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
463        let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
464        socket.set_reuse_address(true).unwrap();
465        socket.bind(addr).unwrap();
466        socket.listen(1).unwrap();
467        let listener = TcpListener::from(socket);
468        listener.local_addr().unwrap()
469    }
470
471    #[tokio::test(flavor = "multi_thread")]
472    async fn test_metrics_endpoint() {
473        // Install the recorder before serve() so gauge registrations are captured,
474        // mirroring how start_prometheus_endpoint() works in the real node launch.
475        install_prometheus_recorder();
476
477        let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
478        let storage_settings_info = StorageSettingsInfo {
479            storage_v2: true,
480            pruning_mode: "archive",
481            prune_config: r#"{"block_interval":5}"#.to_string(),
482        };
483        let version_info = VersionInfo {
484            version: "test",
485            build_timestamp: "test",
486            cargo_features: "test",
487            git_sha: "test",
488            target_triple: "test",
489            build_profile: "test",
490        };
491
492        let runtime = Runtime::test();
493
494        let hooks = Hooks::builder().build();
495
496        let listen_addr = get_random_available_addr();
497        let config = MetricServerConfig::new(
498            listen_addr,
499            version_info,
500            chain_spec_info,
501            runtime.clone(),
502            hooks,
503            std::env::temp_dir(),
504        )
505        .with_storage_settings_info(storage_settings_info);
506
507        MetricServer::new(config).serve().await.unwrap();
508
509        // Send request to the metrics endpoint
510        let url = format!("http://{listen_addr}");
511        let response = Client::new().get(&url).send().await.unwrap();
512        assert!(response.status().is_success());
513
514        // Check the response body
515        let body = response.text().await.unwrap();
516        assert!(body.contains("reth_process_cpu_seconds_total"));
517        assert!(body.contains("reth_process_start_time_seconds"));
518        assert!(body.contains("process_cli_args"), "expected process_cli_args metric in output");
519        assert!(body.contains("reth_storage_settings"), "expected storage settings metric");
520        assert!(body.contains("storage_v2=\"true\""), "expected storage v2 label");
521        assert!(body.contains("pruning_mode=\"archive\""), "expected pruning mode label");
522        assert!(body.contains("prune_config="), "expected prune config label");
523
524        // Make sure the runtime is dropped after the test runs.
525        drop(runtime);
526    }
527}