reth_bench/bench/
metrics_scraper.rs1use csv::Writer;
7use eyre::Context;
8use reqwest::Client;
9use serde::Serialize;
10use std::{path::Path, time::Duration};
11use tracing::info;
12
13pub(crate) const METRICS_OUTPUT_SUFFIX: &str = "metrics.csv";
15
16#[derive(Debug, Clone, Serialize)]
18pub(crate) struct MetricsRow {
19 pub(crate) block_number: u64,
21 pub(crate) execution_duration_secs: Option<f64>,
23 pub(crate) state_root_duration_secs: Option<f64>,
26}
27
28pub(crate) struct MetricsScraper {
31 url: String,
33 client: Client,
35 rows: Vec<MetricsRow>,
37}
38
39impl MetricsScraper {
40 pub(crate) fn maybe_new(url: Option<String>) -> Option<Self> {
42 url.map(|url| {
43 info!(target: "reth-bench", %url, "Prometheus metrics scraping enabled");
44 let client = Client::builder()
45 .timeout(Duration::from_secs(5))
46 .build()
47 .expect("failed to build reqwest client");
48 Self { url, client, rows: Vec::new() }
49 })
50 }
51
52 pub(crate) async fn scrape_after_block(&mut self, block_number: u64) -> eyre::Result<()> {
54 let text = self
55 .client
56 .get(&self.url)
57 .send()
58 .await
59 .wrap_err("failed to fetch metrics endpoint")?
60 .error_for_status()
61 .wrap_err("metrics endpoint returned error status")?
62 .text()
63 .await
64 .wrap_err("failed to read metrics response body")?;
65
66 let execution = parse_gauge(&text, "sync_execution_execution_duration");
67 let state_root = parse_gauge(&text, "sync_block_validation_state_root_duration");
68
69 self.rows.push(MetricsRow {
70 block_number,
71 execution_duration_secs: execution,
72 state_root_duration_secs: state_root,
73 });
74 Ok(())
75 }
76
77 pub(crate) fn write_csv(&self, output_dir: &Path) -> eyre::Result<()> {
79 let path = output_dir.join(METRICS_OUTPUT_SUFFIX);
80 info!(target: "reth-bench", "Writing scraped metrics to file: {:?}", path);
81 let mut writer = Writer::from_path(&path)?;
82 for row in &self.rows {
83 writer.serialize(row)?;
84 }
85 writer.flush()?;
86 Ok(())
87 }
88}
89
90fn parse_gauge(text: &str, name: &str) -> Option<f64> {
96 let mut result = None;
97 for line in text.lines() {
98 let line = line.trim();
99 if line.is_empty() || line.starts_with('#') {
100 continue;
101 }
102
103 if !line.starts_with(name) {
104 continue;
105 }
106
107 let rest = &line[name.len()..];
109 if !rest.starts_with(' ') && !rest.starts_with('{') {
110 continue;
111 }
112
113 let mut parts = line.split_whitespace();
116 if let Some(value_str) = parts.nth(1) &&
117 let Ok(v) = value_str.parse::<f64>()
118 {
119 result = Some(v);
120 }
121 }
122 result
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128
129 #[test]
130 fn test_parse_gauge_simple() {
131 let text = r#"# HELP sync_execution_execution_duration Duration of execution
132# TYPE sync_execution_execution_duration gauge
133sync_execution_execution_duration 0.123456
134"#;
135 assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.123456));
136 }
137
138 #[test]
139 fn test_parse_gauge_missing() {
140 let text = "some_other_metric 1.0\n";
141 assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), None);
142 }
143
144 #[test]
145 fn test_parse_gauge_with_labels() {
146 let text = "sync_block_validation_state_root_duration{instance=\"node1\"} 0.5\n";
147 assert_eq!(parse_gauge(text, "sync_block_validation_state_root_duration"), Some(0.5));
148 }
149
150 #[test]
151 fn test_parse_gauge_prefix_no_false_match() {
152 let text =
153 "sync_execution_execution_duration_total 99.0\nsync_execution_execution_duration 0.5\n";
154 assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.5));
155 }
156}