reth_bench_compare/
benchmark.rs1use crate::cli::Args;
4use eyre::{eyre, Result, WrapErr};
5use std::{
6 path::Path,
7 sync::{Arc, Mutex},
8};
9use tokio::{
10 fs::File as AsyncFile,
11 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12 process::Command,
13};
14use tracing::{debug, error, info, warn};
15
16pub(crate) struct BenchmarkRunner {
18 rpc_url: String,
19 jwt_secret: String,
20 wait_time: Option<String>,
21 wait_for_persistence: bool,
22 persistence_threshold: Option<u64>,
23 warmup_blocks: u64,
24}
25
26impl BenchmarkRunner {
27 pub(crate) fn new(args: &Args) -> Self {
29 Self {
30 rpc_url: args.get_rpc_url(),
31 jwt_secret: args.jwt_secret_path().to_string_lossy().to_string(),
32 wait_time: args.wait_time.clone(),
33 wait_for_persistence: args.wait_for_persistence,
34 persistence_threshold: args.persistence_threshold,
35 warmup_blocks: args.get_warmup_blocks(),
36 }
37 }
38
39 pub(crate) async fn clear_fs_caches() -> Result<()> {
41 info!("Clearing filesystem caches...");
42
43 let sync_output =
45 Command::new("sync").output().await.wrap_err("Failed to execute sync command")?;
46
47 if !sync_output.status.success() {
48 return Err(eyre!("sync command failed"));
49 }
50
51 let drop_caches_cmd = Command::new("sudo")
54 .args(["-n", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
55 .output()
56 .await;
57
58 match drop_caches_cmd {
59 Ok(output) if output.status.success() => {
60 info!("Successfully cleared filesystem caches");
61 Ok(())
62 }
63 Ok(output) => {
64 let stderr = String::from_utf8_lossy(&output.stderr);
65 if stderr.contains("sudo: a password is required") {
66 warn!("Unable to clear filesystem caches: sudo password required");
67 warn!(
68 "For optimal benchmarking, configure passwordless sudo for cache clearing:"
69 );
70 warn!(" echo '$USER ALL=(ALL) NOPASSWD: /bin/sh -c echo\\\\ [0-9]\\\\ \\\\>\\\\ /proc/sys/vm/drop_caches' | sudo tee /etc/sudoers.d/drop_caches");
71 Ok(())
72 } else {
73 Err(eyre!("Failed to clear filesystem caches: {}", stderr))
74 }
75 }
76 Err(e) => {
77 warn!("Unable to clear filesystem caches: {}", e);
78 Ok(())
79 }
80 }
81 }
82
83 pub(crate) async fn run_warmup(&self, from_block: u64) -> Result<()> {
85 let to_block = from_block + self.warmup_blocks;
86 info!(
87 "Running warmup benchmark from block {} to {} ({} blocks)",
88 from_block, to_block, self.warmup_blocks
89 );
90
91 let mut cmd = Command::new("reth-bench");
93 cmd.args([
94 "new-payload-fcu",
95 "--rpc-url",
96 &self.rpc_url,
97 "--jwt-secret",
98 &self.jwt_secret,
99 "--from",
100 &from_block.to_string(),
101 "--to",
102 &to_block.to_string(),
103 "--wait-time=0ms", ]);
105
106 cmd.env("RUST_LOG_STYLE", "never")
107 .stdout(std::process::Stdio::piped())
108 .stderr(std::process::Stdio::piped())
109 .kill_on_drop(true);
110
111 #[cfg(unix)]
113 {
114 cmd.process_group(0);
115 }
116
117 debug!("Executing warmup reth-bench command: {:?}", cmd);
118
119 let mut child = cmd.spawn().wrap_err("Failed to start warmup reth-bench process")?;
121
122 if let Some(stdout) = child.stdout.take() {
124 tokio::spawn(async move {
125 let reader = BufReader::new(stdout);
126 let mut lines = reader.lines();
127 while let Ok(Some(line)) = lines.next_line().await {
128 debug!("[WARMUP] {}", line);
129 }
130 });
131 }
132
133 if let Some(stderr) = child.stderr.take() {
134 tokio::spawn(async move {
135 let reader = BufReader::new(stderr);
136 let mut lines = reader.lines();
137 while let Ok(Some(line)) = lines.next_line().await {
138 debug!("[WARMUP] {}", line);
139 }
140 });
141 }
142
143 let status = child.wait().await.wrap_err("Failed to wait for warmup reth-bench")?;
144
145 if !status.success() {
146 return Err(eyre!("Warmup reth-bench failed with exit code: {:?}", status.code()));
147 }
148
149 info!("Warmup completed successfully");
150 Ok(())
151 }
152
153 pub(crate) async fn run_benchmark(
155 &self,
156 from_block: u64,
157 to_block: u64,
158 output_dir: &Path,
159 ) -> Result<()> {
160 info!(
161 "Running benchmark from block {} to {} (output: {:?})",
162 from_block, to_block, output_dir
163 );
164
165 std::fs::create_dir_all(output_dir)
167 .wrap_err_with(|| format!("Failed to create output directory: {output_dir:?}"))?;
168
169 let log_file_path = output_dir.join("reth_bench.log");
171 info!("reth-bench logs will be saved to: {:?}", log_file_path);
172
173 let mut cmd = Command::new("reth-bench");
175 cmd.args([
176 "new-payload-fcu",
177 "--rpc-url",
178 &self.rpc_url,
179 "--jwt-secret",
180 &self.jwt_secret,
181 "--from",
182 &from_block.to_string(),
183 "--to",
184 &to_block.to_string(),
185 "--output",
186 &output_dir.to_string_lossy(),
187 ]);
188
189 if let Some(ref wait_time) = self.wait_time {
192 cmd.args(["--wait-time", wait_time]);
193 }
194 if self.wait_for_persistence {
195 cmd.arg("--wait-for-persistence");
196
197 if let Some(threshold) = self.persistence_threshold {
199 cmd.args(["--persistence-threshold", &threshold.to_string()]);
200 }
201 }
202
203 cmd.env("RUST_LOG_STYLE", "never")
204 .stdout(std::process::Stdio::piped())
205 .stderr(std::process::Stdio::piped())
206 .kill_on_drop(true);
207
208 #[cfg(unix)]
210 {
211 cmd.process_group(0);
212 }
213
214 debug!("Executing reth-bench command: {:?}", cmd);
216
217 let mut child = cmd.spawn().wrap_err("Failed to start reth-bench process")?;
219
220 let stdout_lines = Arc::new(Mutex::new(Vec::new()));
222 let stderr_lines = Arc::new(Mutex::new(Vec::new()));
223
224 if let Some(stdout) = child.stdout.take() {
227 let stdout_lines_clone = stdout_lines.clone();
228 let log_file = AsyncFile::create(&log_file_path)
229 .await
230 .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
231 tokio::spawn(async move {
232 let reader = BufReader::new(stdout);
233 let mut lines = reader.lines();
234 let mut log_file = log_file;
235 while let Ok(Some(line)) = lines.next_line().await {
236 debug!("[RETH-BENCH] {}", line);
237 if let Ok(mut captured) = stdout_lines_clone.lock() {
238 captured.push(line.clone());
239 }
240 let log_line = format!("{}\n", line);
242 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
243 debug!("Failed to write to log file: {}", e);
244 }
245 }
246 });
247 }
248
249 if let Some(stderr) = child.stderr.take() {
252 let stderr_lines_clone = stderr_lines.clone();
253 let log_file = AsyncFile::options()
254 .create(true)
255 .append(true)
256 .open(&log_file_path)
257 .await
258 .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
259 tokio::spawn(async move {
260 let reader = BufReader::new(stderr);
261 let mut lines = reader.lines();
262 let mut log_file = log_file;
263 while let Ok(Some(line)) = lines.next_line().await {
264 debug!("[RETH-BENCH] {}", line);
265 if let Ok(mut captured) = stderr_lines_clone.lock() {
266 captured.push(line.clone());
267 }
268 let log_line = format!("{}\n", line);
270 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
271 debug!("Failed to write to log file: {}", e);
272 }
273 }
274 });
275 }
276
277 let status = child.wait().await.wrap_err("Failed to wait for reth-bench")?;
278
279 if !status.success() {
280 error!("reth-bench failed with exit code: {:?}", status.code());
282
283 if let Ok(stdout) = stdout_lines.lock() &&
284 !stdout.is_empty()
285 {
286 error!("reth-bench stdout:");
287 for line in stdout.iter() {
288 error!(" {}", line);
289 }
290 }
291
292 if let Ok(stderr) = stderr_lines.lock() &&
293 !stderr.is_empty()
294 {
295 error!("reth-bench stderr:");
296 for line in stderr.iter() {
297 error!(" {}", line);
298 }
299 }
300
301 return Err(eyre!("reth-bench failed with exit code: {:?}", status.code()));
302 }
303
304 info!("Benchmark completed");
305 Ok(())
306 }
307}