reth_bench_compare/
benchmark.rs1use crate::cli::Args;
4use eyre::{eyre, Result, WrapErr};
5use std::{
6 path::Path,
7 sync::{Arc, Mutex},
8};
9use tokio::{
10 fs::File as AsyncFile,
11 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12 process::Command,
13};
14use tracing::{debug, error, info, warn};
15
16pub(crate) struct BenchmarkRunner {
18 rpc_url: String,
19 jwt_secret: String,
20 wait_time: Option<String>,
21 warmup_blocks: u64,
22}
23
24impl BenchmarkRunner {
25 pub(crate) fn new(args: &Args) -> Self {
27 Self {
28 rpc_url: args.get_rpc_url(),
29 jwt_secret: args.jwt_secret_path().to_string_lossy().to_string(),
30 wait_time: args.wait_time.clone(),
31 warmup_blocks: args.get_warmup_blocks(),
32 }
33 }
34
35 pub(crate) async fn clear_fs_caches() -> Result<()> {
37 info!("Clearing filesystem caches...");
38
39 let sync_output =
41 Command::new("sync").output().await.wrap_err("Failed to execute sync command")?;
42
43 if !sync_output.status.success() {
44 return Err(eyre!("sync command failed"));
45 }
46
47 let drop_caches_cmd = Command::new("sudo")
50 .args(["-n", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
51 .output()
52 .await;
53
54 match drop_caches_cmd {
55 Ok(output) if output.status.success() => {
56 info!("Successfully cleared filesystem caches");
57 Ok(())
58 }
59 Ok(output) => {
60 let stderr = String::from_utf8_lossy(&output.stderr);
61 if stderr.contains("sudo: a password is required") {
62 warn!("Unable to clear filesystem caches: sudo password required");
63 warn!(
64 "For optimal benchmarking, configure passwordless sudo for cache clearing:"
65 );
66 warn!(" echo '$USER ALL=(ALL) NOPASSWD: /bin/sh -c echo\\\\ [0-9]\\\\ \\\\>\\\\ /proc/sys/vm/drop_caches' | sudo tee /etc/sudoers.d/drop_caches");
67 Ok(())
68 } else {
69 Err(eyre!("Failed to clear filesystem caches: {}", stderr))
70 }
71 }
72 Err(e) => {
73 warn!("Unable to clear filesystem caches: {}", e);
74 Ok(())
75 }
76 }
77 }
78
79 pub(crate) async fn run_warmup(&self, from_block: u64) -> Result<()> {
81 let to_block = from_block + self.warmup_blocks;
82 info!(
83 "Running warmup benchmark from block {} to {} ({} blocks)",
84 from_block, to_block, self.warmup_blocks
85 );
86
87 let mut cmd = Command::new("reth-bench");
89 cmd.args([
90 "new-payload-fcu",
91 "--rpc-url",
92 &self.rpc_url,
93 "--jwt-secret",
94 &self.jwt_secret,
95 "--from",
96 &from_block.to_string(),
97 "--to",
98 &to_block.to_string(),
99 ]);
100
101 if let Some(ref wait_time) = self.wait_time {
103 cmd.args(["--wait-time", wait_time]);
104 }
105
106 cmd.env("RUST_LOG_STYLE", "never")
107 .stdout(std::process::Stdio::piped())
108 .stderr(std::process::Stdio::piped())
109 .kill_on_drop(true);
110
111 #[cfg(unix)]
113 {
114 cmd.process_group(0);
115 }
116
117 debug!("Executing warmup reth-bench command: {:?}", cmd);
118
119 let mut child = cmd.spawn().wrap_err("Failed to start warmup reth-bench process")?;
121
122 if let Some(stdout) = child.stdout.take() {
124 tokio::spawn(async move {
125 let reader = BufReader::new(stdout);
126 let mut lines = reader.lines();
127 while let Ok(Some(line)) = lines.next_line().await {
128 debug!("[WARMUP] {}", line);
129 }
130 });
131 }
132
133 if let Some(stderr) = child.stderr.take() {
134 tokio::spawn(async move {
135 let reader = BufReader::new(stderr);
136 let mut lines = reader.lines();
137 while let Ok(Some(line)) = lines.next_line().await {
138 debug!("[WARMUP] {}", line);
139 }
140 });
141 }
142
143 let status = child.wait().await.wrap_err("Failed to wait for warmup reth-bench")?;
144
145 if !status.success() {
146 return Err(eyre!("Warmup reth-bench failed with exit code: {:?}", status.code()));
147 }
148
149 info!("Warmup completed successfully");
150 Ok(())
151 }
152
153 pub(crate) async fn run_benchmark(
155 &self,
156 from_block: u64,
157 to_block: u64,
158 output_dir: &Path,
159 ) -> Result<()> {
160 info!(
161 "Running benchmark from block {} to {} (output: {:?})",
162 from_block, to_block, output_dir
163 );
164
165 std::fs::create_dir_all(output_dir)
167 .wrap_err_with(|| format!("Failed to create output directory: {output_dir:?}"))?;
168
169 let log_file_path = output_dir.join("reth_bench.log");
171 info!("reth-bench logs will be saved to: {:?}", log_file_path);
172
173 let mut cmd = Command::new("reth-bench");
175 cmd.args([
176 "new-payload-fcu",
177 "--rpc-url",
178 &self.rpc_url,
179 "--jwt-secret",
180 &self.jwt_secret,
181 "--from",
182 &from_block.to_string(),
183 "--to",
184 &to_block.to_string(),
185 "--output",
186 &output_dir.to_string_lossy(),
187 ]);
188
189 if let Some(ref wait_time) = self.wait_time {
191 cmd.args(["--wait-time", wait_time]);
192 }
193
194 cmd.env("RUST_LOG_STYLE", "never")
195 .stdout(std::process::Stdio::piped())
196 .stderr(std::process::Stdio::piped())
197 .kill_on_drop(true);
198
199 #[cfg(unix)]
201 {
202 cmd.process_group(0);
203 }
204
205 debug!("Executing reth-bench command: {:?}", cmd);
207
208 let mut child = cmd.spawn().wrap_err("Failed to start reth-bench process")?;
210
211 let stdout_lines = Arc::new(Mutex::new(Vec::new()));
213 let stderr_lines = Arc::new(Mutex::new(Vec::new()));
214
215 if let Some(stdout) = child.stdout.take() {
218 let stdout_lines_clone = stdout_lines.clone();
219 let log_file = AsyncFile::create(&log_file_path)
220 .await
221 .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
222 tokio::spawn(async move {
223 let reader = BufReader::new(stdout);
224 let mut lines = reader.lines();
225 let mut log_file = log_file;
226 while let Ok(Some(line)) = lines.next_line().await {
227 debug!("[RETH-BENCH] {}", line);
228 if let Ok(mut captured) = stdout_lines_clone.lock() {
229 captured.push(line.clone());
230 }
231 let log_line = format!("{}\n", line);
233 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
234 debug!("Failed to write to log file: {}", e);
235 }
236 }
237 });
238 }
239
240 if let Some(stderr) = child.stderr.take() {
243 let stderr_lines_clone = stderr_lines.clone();
244 let log_file = AsyncFile::options()
245 .create(true)
246 .append(true)
247 .open(&log_file_path)
248 .await
249 .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
250 tokio::spawn(async move {
251 let reader = BufReader::new(stderr);
252 let mut lines = reader.lines();
253 let mut log_file = log_file;
254 while let Ok(Some(line)) = lines.next_line().await {
255 debug!("[RETH-BENCH] {}", line);
256 if let Ok(mut captured) = stderr_lines_clone.lock() {
257 captured.push(line.clone());
258 }
259 let log_line = format!("{}\n", line);
261 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
262 debug!("Failed to write to log file: {}", e);
263 }
264 }
265 });
266 }
267
268 let status = child.wait().await.wrap_err("Failed to wait for reth-bench")?;
269
270 if !status.success() {
271 error!("reth-bench failed with exit code: {:?}", status.code());
273
274 if let Ok(stdout) = stdout_lines.lock() &&
275 !stdout.is_empty()
276 {
277 error!("reth-bench stdout:");
278 for line in stdout.iter() {
279 error!(" {}", line);
280 }
281 }
282
283 if let Ok(stderr) = stderr_lines.lock() &&
284 !stderr.is_empty()
285 {
286 error!("reth-bench stderr:");
287 for line in stderr.iter() {
288 error!(" {}", line);
289 }
290 }
291
292 return Err(eyre!("reth-bench failed with exit code: {:?}", status.code()));
293 }
294
295 info!("Benchmark completed");
296 Ok(())
297 }
298}