Skip to main content

reth_node_metrics/
server.rs

1use crate::{
2    chain::ChainSpecInfo,
3    hooks::{Hook, Hooks},
4    recorder::install_prometheus_recorder,
5    version::VersionInfo,
6};
7use bytes::Bytes;
8use eyre::WrapErr;
9use http::{header::CONTENT_TYPE, HeaderValue, Request, Response, StatusCode};
10use http_body_util::Full;
11use metrics::describe_gauge;
12use metrics_process::Collector;
13use reqwest::Client;
14use reth_metrics::metrics::Unit;
15use reth_tasks::TaskExecutor;
16use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
17
18/// Configuration for the [`MetricServer`]
19#[derive(Debug)]
20pub struct MetricServerConfig {
21    listen_addr: SocketAddr,
22    version_info: VersionInfo,
23    chain_spec_info: ChainSpecInfo,
24    task_executor: TaskExecutor,
25    hooks: Hooks,
26    push_gateway_url: Option<String>,
27    push_gateway_interval: Duration,
28    pprof_dump_dir: PathBuf,
29}
30
31impl MetricServerConfig {
32    /// Create a new [`MetricServerConfig`] with the given configuration
33    pub const fn new(
34        listen_addr: SocketAddr,
35        version_info: VersionInfo,
36        chain_spec_info: ChainSpecInfo,
37        task_executor: TaskExecutor,
38        hooks: Hooks,
39        pprof_dump_dir: PathBuf,
40    ) -> Self {
41        Self {
42            listen_addr,
43            hooks,
44            task_executor,
45            version_info,
46            chain_spec_info,
47            push_gateway_url: None,
48            push_gateway_interval: Duration::from_secs(5),
49            pprof_dump_dir,
50        }
51    }
52
53    /// Set the gateway URL and interval for pushing metrics
54    pub fn with_push_gateway(mut self, url: Option<String>, interval: Duration) -> Self {
55        self.push_gateway_url = url;
56        self.push_gateway_interval = interval;
57        self
58    }
59}
60
61/// [`MetricServer`] responsible for serving the metrics endpoint
62#[derive(Debug)]
63pub struct MetricServer {
64    config: MetricServerConfig,
65}
66
67impl MetricServer {
68    /// Create a new [`MetricServer`] with the given configuration
69    pub const fn new(config: MetricServerConfig) -> Self {
70        Self { config }
71    }
72
73    /// Spawns the metrics server
74    pub async fn serve(&self) -> eyre::Result<()> {
75        let MetricServerConfig {
76            listen_addr,
77            hooks,
78            task_executor,
79            version_info,
80            chain_spec_info,
81            push_gateway_url,
82            push_gateway_interval,
83            pprof_dump_dir,
84        } = &self.config;
85
86        let hooks_for_endpoint = hooks.clone();
87        self.start_endpoint(
88            *listen_addr,
89            Arc::new(move || hooks_for_endpoint.iter().for_each(|hook| hook())),
90            task_executor.clone(),
91            pprof_dump_dir.clone(),
92        )
93        .await
94        .wrap_err_with(|| format!("Could not start Prometheus endpoint at {listen_addr}"))?;
95
96        // Start push-gateway task if configured
97        if let Some(url) = push_gateway_url {
98            self.start_push_gateway_task(
99                url.clone(),
100                *push_gateway_interval,
101                hooks.clone(),
102                task_executor.clone(),
103            )?;
104        }
105
106        // Describe metrics after recorder installation
107        describe_db_metrics();
108        describe_static_file_metrics();
109        describe_rocksdb_metrics();
110        Collector::default().describe();
111        describe_memory_stats();
112        describe_io_stats();
113
114        version_info.register_version_metrics();
115        chain_spec_info.register_chain_spec_metrics();
116
117        Ok(())
118    }
119
120    async fn start_endpoint<F: Hook + 'static>(
121        &self,
122        listen_addr: SocketAddr,
123        hook: Arc<F>,
124        task_executor: TaskExecutor,
125        pprof_dump_dir: PathBuf,
126    ) -> eyre::Result<()> {
127        let listener = tokio::net::TcpListener::bind(listen_addr)
128            .await
129            .wrap_err("Could not bind to address")?;
130
131        tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
132
133        task_executor.spawn_with_graceful_shutdown_signal(|mut signal| {
134            Box::pin(async move {
135                loop {
136                    let io = tokio::select! {
137                        _ = &mut signal => break,
138                        io = listener.accept() => {
139                            match io {
140                                Ok((stream, _remote_addr)) => stream,
141                                Err(err) => {
142                                    tracing::error!(%err, "failed to accept connection");
143                                    continue;
144                                }
145                            }
146                        }
147                    };
148
149                    let handle = install_prometheus_recorder();
150                    let hook = hook.clone();
151                    let pprof_dump_dir = pprof_dump_dir.clone();
152                    let service = tower::service_fn(move |req: Request<_>| {
153                        let response =
154                            handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir);
155                        async move { Ok::<_, Infallible>(response) }
156                    });
157
158                    let mut shutdown = signal.clone().ignore_guard();
159                    tokio::task::spawn(async move {
160                        let _ = jsonrpsee_server::serve_with_graceful_shutdown(
161                            io,
162                            service,
163                            &mut shutdown,
164                        )
165                        .await
166                        .inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
167                    });
168                }
169            })
170        });
171
172        Ok(())
173    }
174
175    /// Starts a background task to push metrics to a metrics gateway
176    fn start_push_gateway_task(
177        &self,
178        url: String,
179        interval: Duration,
180        hooks: Hooks,
181        task_executor: TaskExecutor,
182    ) -> eyre::Result<()> {
183        let client = Client::builder()
184            .build()
185            .wrap_err("Could not create HTTP client to push metrics to gateway")?;
186        task_executor.spawn_with_graceful_shutdown_signal(move |mut signal| {
187            Box::pin(async move {
188                tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
189                let handle = install_prometheus_recorder();
190                loop {
191                    tokio::select! {
192                        _ = &mut signal => {
193                            tracing::info!("Shutting down task to push metrics to gateway");
194                            break;
195                        }
196                        _ = tokio::time::sleep(interval) => {
197                            hooks.iter().for_each(|hook| hook());
198                            let metrics = handle.handle().render();
199                            match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
200                                Ok(response) => {
201                                    if !response.status().is_success() {
202                                        tracing::warn!(
203                                            status = %response.status(),
204                                            "Failed to push metrics to gateway"
205                                        );
206                                    }
207                                }
208                                Err(err) => {
209                                    tracing::warn!(%err, "Failed to push metrics to gateway");
210                                }
211                            }
212                        }
213                    }
214                }
215            })
216        });
217        Ok(())
218    }
219}
220
221fn describe_db_metrics() {
222    describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
223    describe_gauge!("db.table_pages", "The number of database pages for a table");
224    describe_gauge!("db.table_entries", "The number of entries for a table");
225    describe_gauge!("db.freelist", "The number of pages on the freelist");
226    describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
227    describe_gauge!(
228        "db.timed_out_not_aborted_transactions",
229        "Number of timed out transactions that were not aborted by the user yet"
230    );
231}
232
233fn describe_static_file_metrics() {
234    describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
235    describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
236    describe_gauge!(
237        "static_files.segment_entries",
238        "The number of entries for a static file segment"
239    );
240}
241
242fn describe_rocksdb_metrics() {
243    describe_gauge!(
244        "rocksdb.table_size",
245        Unit::Bytes,
246        "The estimated size of a RocksDB table (SST + memtable)"
247    );
248    describe_gauge!("rocksdb.table_entries", "The estimated number of keys in a RocksDB table");
249    describe_gauge!(
250        "rocksdb.pending_compaction_bytes",
251        Unit::Bytes,
252        "Bytes pending compaction for a RocksDB table"
253    );
254    describe_gauge!("rocksdb.sst_size", Unit::Bytes, "The size of SST files for a RocksDB table");
255    describe_gauge!(
256        "rocksdb.memtable_size",
257        Unit::Bytes,
258        "The size of memtables for a RocksDB table"
259    );
260    describe_gauge!(
261        "rocksdb.wal_size",
262        Unit::Bytes,
263        "The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
264    );
265}
266
267#[cfg(all(feature = "jemalloc", unix))]
268fn describe_memory_stats() {
269    describe_gauge!(
270        "jemalloc.active",
271        Unit::Bytes,
272        "Total number of bytes in active pages allocated by the application"
273    );
274    describe_gauge!(
275        "jemalloc.allocated",
276        Unit::Bytes,
277        "Total number of bytes allocated by the application"
278    );
279    describe_gauge!(
280        "jemalloc.mapped",
281        Unit::Bytes,
282        "Total number of bytes in active extents mapped by the allocator"
283    );
284    describe_gauge!(
285        "jemalloc.metadata",
286        Unit::Bytes,
287        "Total number of bytes dedicated to jemalloc metadata"
288    );
289    describe_gauge!(
290        "jemalloc.resident",
291        Unit::Bytes,
292        "Total number of bytes in physically resident data pages mapped by the allocator"
293    );
294    describe_gauge!(
295        "jemalloc.retained",
296        Unit::Bytes,
297        "Total number of bytes in virtual memory mappings that were retained rather than \
298        being returned to the operating system via e.g. munmap(2)"
299    );
300}
301
302#[cfg(not(all(feature = "jemalloc", unix)))]
303const fn describe_memory_stats() {}
304
305#[cfg(target_os = "linux")]
306fn describe_io_stats() {
307    use metrics::describe_counter;
308
309    describe_counter!("io.rchar", "Characters read");
310    describe_counter!("io.wchar", "Characters written");
311    describe_counter!("io.syscr", "Read syscalls");
312    describe_counter!("io.syscw", "Write syscalls");
313    describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
314    describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
315    describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
316}
317
318#[cfg(not(target_os = "linux"))]
319const fn describe_io_stats() {}
320
321fn handle_request(
322    path: &str,
323    hook: impl Fn(),
324    handle: &crate::recorder::PrometheusRecorder,
325    pprof_dump_dir: &PathBuf,
326) -> Response<Full<Bytes>> {
327    match path {
328        "/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
329        _ => {
330            hook();
331            let metrics = handle.handle().render();
332            let mut response = Response::new(Full::new(Bytes::from(metrics)));
333            response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
334            response
335        }
336    }
337}
338
339#[cfg(all(feature = "jemalloc-prof", unix))]
340fn handle_pprof_heap(pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
341    use http::header::CONTENT_ENCODING;
342
343    match jemalloc_pprof::PROF_CTL.as_ref() {
344        Some(prof_ctl) => match prof_ctl.try_lock() {
345            Ok(_) => match jemalloc_pprof_dump(pprof_dump_dir) {
346                Ok(pprof) => {
347                    let mut response = Response::new(Full::new(Bytes::from(pprof)));
348                    response
349                        .headers_mut()
350                        .insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
351                    response
352                        .headers_mut()
353                        .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
354                    response
355                }
356                Err(err) => {
357                    let mut response = Response::new(Full::new(Bytes::from(format!(
358                        "Failed to dump pprof: {err}"
359                    ))));
360                    *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
361                    response
362                }
363            },
364            Err(_) => {
365                let mut response = Response::new(Full::new(Bytes::from_static(
366                    b"Profile dump already in progress. Try again later.",
367                )));
368                *response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
369                response
370            }
371        },
372        None => {
373            let mut response = Response::new(Full::new(Bytes::from_static(
374                b"jemalloc profiling not enabled. \
375                 Set MALLOC_CONF=prof:true or rebuild with jemalloc-prof feature.",
376            )));
377            *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
378            response
379        }
380    }
381}
382
383/// Equivalent to [`jemalloc_pprof::JemallocProfCtl::dump`], but accepts a directory that the
384/// temporary pprof file will be written to. The file is deleted when the function exits.
385#[cfg(all(feature = "jemalloc-prof", unix))]
386fn jemalloc_pprof_dump(pprof_dump_dir: &PathBuf) -> eyre::Result<Vec<u8>> {
387    use std::{ffi::CString, io::BufReader};
388
389    use mappings::MAPPINGS;
390    use pprof_util::parse_jeheap;
391    use tempfile::NamedTempFile;
392
393    reth_fs_util::create_dir_all(pprof_dump_dir)?;
394    let f = NamedTempFile::new_in(pprof_dump_dir)?;
395    let path = CString::new(f.path().as_os_str().as_encoded_bytes()).unwrap();
396
397    // SAFETY: "prof.dump" is documented as being writable and taking a C string as input:
398    // http://jemalloc.net/jemalloc.3.html#prof.dump
399    unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", path.as_ptr()) }?;
400
401    let dump_reader = BufReader::new(f);
402    let profile =
403        parse_jeheap(dump_reader, MAPPINGS.as_deref()).map_err(|err| eyre::eyre!(Box::new(err)))?;
404    let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
405
406    Ok(pprof)
407}
408
409#[cfg(not(all(feature = "jemalloc-prof", unix)))]
410fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
411    let mut response = Response::new(Full::new(Bytes::from_static(
412        b"jemalloc pprof support not compiled. Rebuild with the jemalloc-prof feature.",
413    )));
414    *response.status_mut() = StatusCode::NOT_IMPLEMENTED;
415    response
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use reqwest::Client;
422    use reth_tasks::Runtime;
423    use socket2::{Domain, Socket, Type};
424    use std::net::{SocketAddr, TcpListener};
425
426    fn get_random_available_addr() -> SocketAddr {
427        let addr = &"127.0.0.1:0".parse::<SocketAddr>().unwrap().into();
428        let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
429        socket.set_reuse_address(true).unwrap();
430        socket.bind(addr).unwrap();
431        socket.listen(1).unwrap();
432        let listener = TcpListener::from(socket);
433        listener.local_addr().unwrap()
434    }
435
436    #[tokio::test(flavor = "multi_thread")]
437    async fn test_metrics_endpoint() {
438        let chain_spec_info = ChainSpecInfo { name: "test".to_string() };
439        let version_info = VersionInfo {
440            version: "test",
441            build_timestamp: "test",
442            cargo_features: "test",
443            git_sha: "test",
444            target_triple: "test",
445            build_profile: "test",
446        };
447
448        let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
449
450        let hooks = Hooks::builder().build();
451
452        let listen_addr = get_random_available_addr();
453        let config = MetricServerConfig::new(
454            listen_addr,
455            version_info,
456            chain_spec_info,
457            runtime.clone(),
458            hooks,
459            std::env::temp_dir(),
460        );
461
462        MetricServer::new(config).serve().await.unwrap();
463
464        // Send request to the metrics endpoint
465        let url = format!("http://{listen_addr}");
466        let response = Client::new().get(&url).send().await.unwrap();
467        assert!(response.status().is_success());
468
469        // Check the response body
470        let body = response.text().await.unwrap();
471        assert!(body.contains("reth_process_cpu_seconds_total"));
472        assert!(body.contains("reth_process_start_time_seconds"));
473
474        // Make sure the runtime is dropped after the test runs.
475        drop(runtime);
476    }
477}