reth_bench_compare/
benchmark.rs

1//! Benchmark execution using reth-bench.
2
3use crate::cli::Args;
4use eyre::{eyre, Result, WrapErr};
5use std::{
6    path::Path,
7    sync::{Arc, Mutex},
8};
9use tokio::{
10    fs::File as AsyncFile,
11    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12    process::Command,
13};
14use tracing::{debug, error, info, warn};
15
16/// Manages benchmark execution using reth-bench
17pub(crate) struct BenchmarkRunner {
18    rpc_url: String,
19    jwt_secret: String,
20    wait_time: Option<String>,
21    warmup_blocks: u64,
22}
23
24impl BenchmarkRunner {
25    /// Create a new `BenchmarkRunner` from CLI arguments
26    pub(crate) fn new(args: &Args) -> Self {
27        Self {
28            rpc_url: args.get_rpc_url(),
29            jwt_secret: args.jwt_secret_path().to_string_lossy().to_string(),
30            wait_time: args.wait_time.clone(),
31            warmup_blocks: args.get_warmup_blocks(),
32        }
33    }
34
35    /// Clear filesystem caches (page cache, dentries, and inodes)
36    pub(crate) async fn clear_fs_caches() -> Result<()> {
37        info!("Clearing filesystem caches...");
38
39        // First sync to ensure all pending writes are flushed
40        let sync_output =
41            Command::new("sync").output().await.wrap_err("Failed to execute sync command")?;
42
43        if !sync_output.status.success() {
44            return Err(eyre!("sync command failed"));
45        }
46
47        // Drop caches - requires sudo/root permissions
48        // 3 = drop pagecache, dentries, and inodes
49        let drop_caches_cmd = Command::new("sudo")
50            .args(["-n", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
51            .output()
52            .await;
53
54        match drop_caches_cmd {
55            Ok(output) if output.status.success() => {
56                info!("Successfully cleared filesystem caches");
57                Ok(())
58            }
59            Ok(output) => {
60                let stderr = String::from_utf8_lossy(&output.stderr);
61                if stderr.contains("sudo: a password is required") {
62                    warn!("Unable to clear filesystem caches: sudo password required");
63                    warn!(
64                        "For optimal benchmarking, configure passwordless sudo for cache clearing:"
65                    );
66                    warn!("  echo '$USER ALL=(ALL) NOPASSWD: /bin/sh -c echo\\\\ [0-9]\\\\ \\\\>\\\\ /proc/sys/vm/drop_caches' | sudo tee /etc/sudoers.d/drop_caches");
67                    Ok(())
68                } else {
69                    Err(eyre!("Failed to clear filesystem caches: {}", stderr))
70                }
71            }
72            Err(e) => {
73                warn!("Unable to clear filesystem caches: {}", e);
74                Ok(())
75            }
76        }
77    }
78
79    /// Run a warmup benchmark for cache warming
80    pub(crate) async fn run_warmup(&self, from_block: u64) -> Result<()> {
81        let to_block = from_block + self.warmup_blocks;
82        info!(
83            "Running warmup benchmark from block {} to {} ({} blocks)",
84            from_block, to_block, self.warmup_blocks
85        );
86
87        // Build the reth-bench command for warmup (no output flag)
88        let mut cmd = Command::new("reth-bench");
89        cmd.args([
90            "new-payload-fcu",
91            "--rpc-url",
92            &self.rpc_url,
93            "--jwt-secret",
94            &self.jwt_secret,
95            "--from",
96            &from_block.to_string(),
97            "--to",
98            &to_block.to_string(),
99        ]);
100
101        // Add wait-time argument if provided
102        if let Some(ref wait_time) = self.wait_time {
103            cmd.args(["--wait-time", wait_time]);
104        }
105
106        cmd.env("RUST_LOG_STYLE", "never")
107            .stdout(std::process::Stdio::piped())
108            .stderr(std::process::Stdio::piped())
109            .kill_on_drop(true);
110
111        // Set process group for consistent signal handling
112        #[cfg(unix)]
113        {
114            cmd.process_group(0);
115        }
116
117        debug!("Executing warmup reth-bench command: {:?}", cmd);
118
119        // Execute the warmup benchmark
120        let mut child = cmd.spawn().wrap_err("Failed to start warmup reth-bench process")?;
121
122        // Stream output at debug level
123        if let Some(stdout) = child.stdout.take() {
124            tokio::spawn(async move {
125                let reader = BufReader::new(stdout);
126                let mut lines = reader.lines();
127                while let Ok(Some(line)) = lines.next_line().await {
128                    debug!("[WARMUP] {}", line);
129                }
130            });
131        }
132
133        if let Some(stderr) = child.stderr.take() {
134            tokio::spawn(async move {
135                let reader = BufReader::new(stderr);
136                let mut lines = reader.lines();
137                while let Ok(Some(line)) = lines.next_line().await {
138                    debug!("[WARMUP] {}", line);
139                }
140            });
141        }
142
143        let status = child.wait().await.wrap_err("Failed to wait for warmup reth-bench")?;
144
145        if !status.success() {
146            return Err(eyre!("Warmup reth-bench failed with exit code: {:?}", status.code()));
147        }
148
149        info!("Warmup completed successfully");
150        Ok(())
151    }
152
153    /// Run a benchmark for the specified block range
154    pub(crate) async fn run_benchmark(
155        &self,
156        from_block: u64,
157        to_block: u64,
158        output_dir: &Path,
159    ) -> Result<()> {
160        info!(
161            "Running benchmark from block {} to {} (output: {:?})",
162            from_block, to_block, output_dir
163        );
164
165        // Ensure output directory exists
166        std::fs::create_dir_all(output_dir)
167            .wrap_err_with(|| format!("Failed to create output directory: {output_dir:?}"))?;
168
169        // Create log file path for reth-bench output
170        let log_file_path = output_dir.join("reth_bench.log");
171        info!("reth-bench logs will be saved to: {:?}", log_file_path);
172
173        // Build the reth-bench command
174        let mut cmd = Command::new("reth-bench");
175        cmd.args([
176            "new-payload-fcu",
177            "--rpc-url",
178            &self.rpc_url,
179            "--jwt-secret",
180            &self.jwt_secret,
181            "--from",
182            &from_block.to_string(),
183            "--to",
184            &to_block.to_string(),
185            "--output",
186            &output_dir.to_string_lossy(),
187        ]);
188
189        // Add wait-time argument if provided
190        if let Some(ref wait_time) = self.wait_time {
191            cmd.args(["--wait-time", wait_time]);
192        }
193
194        cmd.env("RUST_LOG_STYLE", "never")
195            .stdout(std::process::Stdio::piped())
196            .stderr(std::process::Stdio::piped())
197            .kill_on_drop(true);
198
199        // Set process group for consistent signal handling
200        #[cfg(unix)]
201        {
202            cmd.process_group(0);
203        }
204
205        // Debug log the command
206        debug!("Executing reth-bench command: {:?}", cmd);
207
208        // Execute the benchmark
209        let mut child = cmd.spawn().wrap_err("Failed to start reth-bench process")?;
210
211        // Capture stdout and stderr for error reporting
212        let stdout_lines = Arc::new(Mutex::new(Vec::new()));
213        let stderr_lines = Arc::new(Mutex::new(Vec::new()));
214
215        // Stream stdout with prefix at debug level, capture for error reporting, and write to log
216        // file
217        if let Some(stdout) = child.stdout.take() {
218            let stdout_lines_clone = stdout_lines.clone();
219            let log_file = AsyncFile::create(&log_file_path)
220                .await
221                .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
222            tokio::spawn(async move {
223                let reader = BufReader::new(stdout);
224                let mut lines = reader.lines();
225                let mut log_file = log_file;
226                while let Ok(Some(line)) = lines.next_line().await {
227                    debug!("[RETH-BENCH] {}", line);
228                    if let Ok(mut captured) = stdout_lines_clone.lock() {
229                        captured.push(line.clone());
230                    }
231                    // Write to log file (reth-bench output already has timestamps if needed)
232                    let log_line = format!("{}\n", line);
233                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
234                        debug!("Failed to write to log file: {}", e);
235                    }
236                }
237            });
238        }
239
240        // Stream stderr with prefix at debug level, capture for error reporting, and write to log
241        // file
242        if let Some(stderr) = child.stderr.take() {
243            let stderr_lines_clone = stderr_lines.clone();
244            let log_file = AsyncFile::options()
245                .create(true)
246                .append(true)
247                .open(&log_file_path)
248                .await
249                .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
250            tokio::spawn(async move {
251                let reader = BufReader::new(stderr);
252                let mut lines = reader.lines();
253                let mut log_file = log_file;
254                while let Ok(Some(line)) = lines.next_line().await {
255                    debug!("[RETH-BENCH] {}", line);
256                    if let Ok(mut captured) = stderr_lines_clone.lock() {
257                        captured.push(line.clone());
258                    }
259                    // Write to log file (reth-bench output already has timestamps if needed)
260                    let log_line = format!("{}\n", line);
261                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
262                        debug!("Failed to write to log file: {}", e);
263                    }
264                }
265            });
266        }
267
268        let status = child.wait().await.wrap_err("Failed to wait for reth-bench")?;
269
270        if !status.success() {
271            // Print all captured output when command fails
272            error!("reth-bench failed with exit code: {:?}", status.code());
273
274            if let Ok(stdout) = stdout_lines.lock() &&
275                !stdout.is_empty()
276            {
277                error!("reth-bench stdout:");
278                for line in stdout.iter() {
279                    error!("  {}", line);
280                }
281            }
282
283            if let Ok(stderr) = stderr_lines.lock() &&
284                !stderr.is_empty()
285            {
286                error!("reth-bench stderr:");
287                for line in stderr.iter() {
288                    error!("  {}", line);
289                }
290            }
291
292            return Err(eyre!("reth-bench failed with exit code: {:?}", status.code()));
293        }
294
295        info!("Benchmark completed");
296        Ok(())
297    }
298}