Skip to main content

reth_node_metrics/
server.rs

1use crate::{
2    chain::ChainSpecInfo,
3    hooks::{Hook, Hooks},
4    recorder::install_prometheus_recorder,
5    version::VersionInfo,
6};
7use bytes::Bytes;
8use eyre::WrapErr;
9use http::{header::CONTENT_TYPE, HeaderValue, Request, Response, StatusCode};
10use http_body_util::Full;
11use metrics::describe_gauge;
12use metrics_process::Collector;
13use reqwest::Client;
14use reth_metrics::metrics::Unit;
15use reth_tasks::TaskExecutor;
16use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
17
18/// Configuration for the [`MetricServer`]
19#[derive(Debug)]
20pub struct MetricServerConfig {
21    listen_addr: SocketAddr,
22    version_info: VersionInfo,
23    chain_spec_info: ChainSpecInfo,
24    task_executor: TaskExecutor,
25    hooks: Hooks,
26    push_gateway_url: Option<String>,
27    push_gateway_interval: Duration,
28    pprof_dump_dir: PathBuf,
29}
30
31impl MetricServerConfig {
32    /// Create a new [`MetricServerConfig`] with the given configuration
33    pub const fn new(
34        listen_addr: SocketAddr,
35        version_info: VersionInfo,
36        chain_spec_info: ChainSpecInfo,
37        task_executor: TaskExecutor,
38        hooks: Hooks,
39        pprof_dump_dir: PathBuf,
40    ) -> Self {
41        Self {
42            listen_addr,
43            hooks,
44            task_executor,
45            version_info,
46            chain_spec_info,
47            push_gateway_url: None,
48            push_gateway_interval: Duration::from_secs(5),
49            pprof_dump_dir,
50        }
51    }
52
53    /// Set the gateway URL and interval for pushing metrics
54    pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
55        self.push_gateway_url = url;
56        self.push_gateway_interval = interval;
57        self
58    }
59}
60
61/// [`MetricServer`] responsible for serving the metrics endpoint
62#[derive(Debug)]
63pub struct MetricServer {
64    config: MetricServerConfig,
65}
66
67impl MetricServer {
68    /// Create a new [`MetricServer`] with the given configuration
69    pub const fn new(config: MetricServerConfig) -> Self {
70        Self { config }
71    }
72
73    /// Spawns the metrics server
74    pub async fn serve(&self) -> eyre::Result<()> {
75        let MetricServerConfig {
76            listen_addr,
77            hooks,
78            task_executor,
79            version_info,
80            chain_spec_info,
81            push_gateway_url,
82            push_gateway_interval,
83            pprof_dump_dir,
84        } = &self.config;
85
86        let hooks_for_endpoint = hooks.clone();
87        self.start_endpoint(
88            *listen_addr,
89            Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
90            task_executor.clone(),
91            pprof_dump_dir.clone(),
92        )
93        .await
94        .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
95
96        // Start push-gateway task if configured
97        if let Some(url) = push_gateway_url {
98            self.start_push_gateway_task(
99                url.clone(),
100                *push_gateway_interval,
101                hooks.clone(),
102                task_executor.clone(),
103            )?;
104        }
105
106        // Describe metrics after recorder installation
107        describe_db_metrics();
108        describe_static_file_metrics();
109        describe_rocksdb_metrics();
110        Collector::default().describe();
111        describe_memory_stats();
112        describe_io_stats();
113
114        version_info.register_version_metrics();
115        chain_spec_info.register_chain_spec_metrics();
116
117        Ok(())
118    }
119
120    async fn start_endpoint<F: Hook + 'static>(
121        &self,
122        listen_addr: SocketAddr,
123        hook: Arc<F>,
124        task_executor: TaskExecutor,
125        pprof_dump_dir: PathBuf,
126    ) -> eyre::Result<()> {
127        let listener = tokio::net::TcpListener::bind(listen_addr)
128            .await
129            .wrap_err("Could not bind to address")?;
130
131        tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
132
133        task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
134            let io = tokio::select! {
135                _ = &mut signal => break,
136                io = listener.accept() => {
137                    match io {
138                        Ok((stream, _remote_addr)) => stream,
139                        Err(err) => {
140                            tracing::error!(%err, "failed to accept connection");
141                            continue;
142                        }
143                    }
144                }
145            };
146
147            let handle = install_prometheus_recorder();
148            let hook = hook.clone();
149            let pprof_dump_dir = pprof_dump_dir.clone();
150            let service = tower::service_fn(move |req: Request<_>| {
151                let hook = hook.clone();
152                let pprof_dump_dir = pprof_dump_dir.clone();
153                async move {
154                    let response =
155                        handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir).await;
156                    Ok::<_, Infallible>(response)
157                }
158            });
159
160            let mut shutdown = signal.clone().ignore_guard();
161            tokio::task::spawn(async move {
162                let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
163                    .await
164                    .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
165            });
166        });
167
168        Ok(())
169    }
170
171    /// Starts a background task to push metrics to a metrics gateway
172    fn start_push_gateway_task(
173        &self,
174        url: String,
175        interval: Duration,
176        hooks: Hooks,
177        task_executor: TaskExecutor,
178    ) -> eyre::Result<()> {
179        let client = Client::builder()
180            .build()
181            .wrap_err("Could not create HTTP client to push metrics to gateway")?;
182        task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
183            tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
184            let handle = install_prometheus_recorder();
185            loop {
186                tokio::select! {
187                    _ = &mut signal => {
188                        tracing::info!("Shutting down task to push metrics to gateway");
189                        break;
190                    }
191                    _ = tokio::time::sleep(interval) => {
192                        hooks.iter().for_each(|hook| hook());
193                        let metrics = handle.handle().render();
194                        match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
195                            Ok(response) => {
196                                if !response.status().is_success() {
197                                    tracing::warn!(
198                                        status = %response.status(),
199                                        "Failed to push metrics to gateway"
200                                    );
201                                }
202                            }
203                            Err(err) => {
204                                tracing::warn!(%err, "Failed to push metrics to gateway");
205                            }
206                        }
207                    }
208                }
209            }
210        });
211        Ok(())
212    }
213}
214
215fn describe_db_metrics() {
216    describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
217    describe_gauge!("db.table_pages", "The number of database pages for a table");
218    describe_gauge!("db.table_entries", "The number of entries for a table");
219    describe_gauge!("db.freelist", "The number of pages on the freelist");
220    describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
221    describe_gauge!(
222        "db.timed_out_not_aborted_transactions",
223        "Number of timed out transactions that were not aborted by the user yet"
224    );
225}
226
227fn describe_static_file_metrics() {
228    describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
229    describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
230    describe_gauge!(
231        "static_files.segment_entries",
232        "The number of entries for a static file segment"
233    );
234}
235
236fn describe_rocksdb_metrics() {
237    describe_gauge!(
238        "rocksdb.table_size",
239        Unit::Bytes,
240        "The estimated size of a RocksDB table (SST + memtable)"
241    );
242    describe_gauge!("rocksdb.table_entries", "The estimated number of keys in a RocksDB table");
243    describe_gauge!(
244        "rocksdb.pending_compaction_bytes",
245        Unit::Bytes,
246        "Bytes pending compaction for a RocksDB table"
247    );
248    describe_gauge!("rocksdb.sst_size", Unit::Bytes, "The size of SST files for a RocksDB table");
249    describe_gauge!(
250        "rocksdb.memtable_size",
251        Unit::Bytes,
252        "The size of memtables for a RocksDB table"
253    );
254    describe_gauge!(
255        "rocksdb.wal_size",
256        Unit::Bytes,
257        "The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
258    );
259}
260
261#[cfg(all(feature = "jemalloc", unix))]
262fn describe_memory_stats() {
263    describe_gauge!(
264        "jemalloc.active",
265        Unit::Bytes,
266        "Total number of bytes in active pages allocated by the application"
267    );
268    describe_gauge!(
269        "jemalloc.allocated",
270        Unit::Bytes,
271        "Total number of bytes allocated by the application"
272    );
273    describe_gauge!(
274        "jemalloc.mapped",
275        Unit::Bytes,
276        "Total number of bytes in active extents mapped by the allocator"
277    );
278    describe_gauge!(
279        "jemalloc.metadata",
280        Unit::Bytes,
281        "Total number of bytes dedicated to jemalloc metadata"
282    );
283    describe_gauge!(
284        "jemalloc.resident",
285        Unit::Bytes,
286        "Total number of bytes in physically resident data pages mapped by the allocator"
287    );
288    describe_gauge!(
289        "jemalloc.retained",
290        Unit::Bytes,
291        "Total number of bytes in virtual memory mappings that were retained rather than \
292        being returned to the operating system via e.g. munmap(2)"
293    );
294}
295
296#[cfg(not(all(feature = "jemalloc", unix)))]
297const fn describe_memory_stats() {}
298
299#[cfg(target_os = "linux")]
300fn describe_io_stats() {
301    use metrics::describe_counter;
302
303    describe_counter!("io.rchar", "Characters read");
304    describe_counter!("io.wchar", "Characters written");
305    describe_counter!("io.syscr", "Read syscalls");
306    describe_counter!("io.syscw", "Write syscalls");
307    describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
308    describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
309    describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
310}
311
312#[cfg(not(target_os = "linux"))]
313const fn describe_io_stats() {}
314
315async fn handle_request(
316    path: &str,
317    hook: impl Fn(),
318    handle: &crate::recorder::PrometheusRecorder,
319    pprof_dump_dir: &PathBuf,
320) -> Response<Full<Bytes>> {
321    match path {
322        "/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
323        "/debug/tokio/dump" => handle_tokio_dump().await,
324        _ => {
325            hook();
326            let metrics = handle.handle().render();
327            let mut response = Response::new(Full::new(Bytes::from(metrics)));
328            response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
329            response
330        }
331    }
332}
333
334#[cfg(all(feature = "jemalloc-prof", unix))]
335fn handle_pprof_heap(pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
336    use http::header::CONTENT_ENCODING;
337
338    match jemalloc_pprof::PROF_CTL.as_ref() {
339        Some(prof_ctl) => match prof_ctl.try_lock() {
340            Ok(_) => match jemalloc_pprof_dump(pprof_dump_dir) {
341                Ok(pprof) => {
342                    let mut response = Response::new(Full::new(Bytes::from(pprof)));
343                    response
344                        .headers_mut()
345                        .insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
346                    response
347                        .headers_mut()
348                        .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
349                    response
350                }
351                Err(err) => {
352                    let mut response = Response::new(Full::new(Bytes::from(format!(
353                        "Failed to dump pprof: {err}"
354                    ))));
355                    *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
356                    response
357                }
358            },
359            Err(_) => {
360                let mut response = Response::new(Full::new(Bytes::from_static(
361                    b"Profile dump already in progress. Try again later.",
362                )));
363                *response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
364                response
365            }
366        },
367        None => {
368            let mut response = Response::new(Full::new(Bytes::from_static(
369                b"jemalloc profiling not enabled. \
370                 Set MALLOC_CONF=prof:true or rebuild with jemalloc-prof feature.",
371            )));
372            *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
373            response
374        }
375    }
376}
377
378/// Equivalent to [`jemalloc_pprof::JemallocProfCtl::dump`], but accepts a directory that the
379/// temporary pprof file will be written to. The file is deleted when the function exits.
380#[cfg(all(feature = "jemalloc-prof", unix))]
381fn jemalloc_pprof_dump(pprof_dump_dir: &PathBuf) -> eyre::Result<Vec<u8>> {
382    use std::{ffi::CString, io::BufReader};
383
384    use mappings::MAPPINGS;
385    use pprof_util::parse_jeheap;
386    use tempfile::NamedTempFile;
387
388    reth_fs_util::create_dir_all(pprof_dump_dir)?;
389    let f = NamedTempFile::new_in(pprof_dump_dir)?;
390    let path = CString::new(f.path().as_os_str().as_encoded_bytes()).unwrap();
391
392    // SAFETY: "prof.dump" is documented as being writable and taking a C string as input:
393    // http://jemalloc.net/jemalloc.3.html#prof.dump
394    unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", path.as_ptr()) }?;
395
396    let dump_reader = BufReader::new(f);
397    let profile =
398        parse_jeheap(dump_reader, MAPPINGS.as_deref()).map_err(|err| eyre::eyre!(Box::new(err)))?;
399    let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
400
401    Ok(pprof)
402}
403
404#[cfg(not(all(feature = "jemalloc-prof", unix)))]
405fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
406    let mut response = Response::new(Full::new(Bytes::from_static(
407        b"jemalloc pprof support not compiled. Rebuild with the jemalloc-prof feature.",
408    )));
409    *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
410    response
411}
412
413#[cfg(tokio_unstable)]
414async fn handle_tokio_dump() -> Response<Full<Bytes>> {
415    let handle = tokio::runtime::Handle::current();
416    let dump = handle.dump().await;
417
418    let mut output = String::new();
419    for (i, task) in dump.tasks().iter().enumerate() {
420        let trace = task.trace();
421        output.push_str(&format!("task {i}:\n{trace}\n\n"));
422    }
423
424    let mut response = Response::new(Full::new(Bytes::from(output)));
425    response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
426    response
427}
428
429#[cfg(not(tokio_unstable))]
430async fn handle_tokio_dump() -> Response<Full<Bytes>> {
431    let mut response = Response::new(Full::new(Bytes::from_static(
432        b"tokio task dump not available. Rebuild with RUSTFLAGS=\"--cfg tokio_unstable\" and tokio's `taskdump` feature.",
433    )));
434    *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
435    response
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use reqwest::Client;
442    use reth_tasks::Runtime;
443    use socket2::{Domain, Socket, Type};
444    use std::net::{SocketAddr, TcpListener};
445
446    fn get_random_available_addr() -> SocketAddr {
447        let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
448        let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
449        socket.set_reuse_address(true).unwrap();
450        socket.bind(addr).unwrap();
451        socket.listen(1).unwrap();
452        let listener = TcpListener::from(socket);
453        listener.local_addr().unwrap()
454    }
455
456    #[tokio::test(flavor = "multi_thread")]
457    async fn test_metrics_endpoint() {
458        let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
459        let version_info = VersionInfo {
460            version: "test",
461            build_timestamp: "test",
462            cargo_features: "test",
463            git_sha: "test",
464            target_triple: "test",
465            build_profile: "test",
466        };
467
468        let runtime = Runtime::test();
469
470        let hooks = Hooks::builder().build();
471
472        let listen_addr = get_random_available_addr();
473        let config = MetricServerConfig::new(
474            listen_addr,
475            version_info,
476            chain_spec_info,
477            runtime.clone(),
478            hooks,
479            std::env::temp_dir(),
480        );
481
482        MetricServer::new(config).serve().await.unwrap();
483
484        // Send request to the metrics endpoint
485        let url = format!("http://{listen_addr}");
486        let response = Client::new().get(&url).send().await.unwrap();
487        assert!(response.status().is_success());
488
489        // Check the response body
490        let body = response.text().await.unwrap();
491        assert!(body.contains("reth_process_cpu_seconds_total"));
492        assert!(body.contains("reth_process_start_time_seconds"));
493
494        // Make sure the runtime is dropped after the test runs.
495        drop(runtime);
496    }
497}