Skip to main content

reth_bench_compare/
benchmark.rs

1//! Benchmark execution using reth-bench.
2
3use crate::cli::Args;
4use eyre::{eyre, Result, WrapErr};
5use std::{
6    path::Path,
7    sync::{Arc, Mutex},
8};
9use tokio::{
10    fs::File as AsyncFile,
11    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12    process::Command,
13};
14use tracing::{debug, error, info, warn};
15
16/// Manages benchmark execution using reth-bench
17pub(crate) struct BenchmarkRunner {
18    rpc_url: String,
19    jwt_secret: String,
20    wait_time: Option<String>,
21    wait_for_persistence: bool,
22    persistence_threshold: Option<u64>,
23    warmup_blocks: u64,
24}
25
26impl BenchmarkRunner {
27    /// Create a new `BenchmarkRunner` from CLI arguments
28    pub(crate) fn new(args: &Args) -> Self {
29        Self {
30            rpc_url: args.get_rpc_url(),
31            jwt_secret: args.jwt_secret_path().to_string_lossy().to_string(),
32            wait_time: args.wait_time.clone(),
33            wait_for_persistence: args.wait_for_persistence,
34            persistence_threshold: args.persistence_threshold,
35            warmup_blocks: args.get_warmup_blocks(),
36        }
37    }
38
39    /// Clear filesystem caches (page cache, dentries, and inodes)
40    pub(crate) async fn clear_fs_caches() -> Result<()> {
41        info!("Clearing filesystem caches...");
42
43        // First sync to ensure all pending writes are flushed
44        let sync_output =
45            Command::new("sync").output().await.wrap_err("Failed to execute sync command")?;
46
47        if !sync_output.status.success() {
48            return Err(eyre!("sync command failed"));
49        }
50
51        // Drop caches - requires sudo/root permissions
52        // 3 = drop pagecache, dentries, and inodes
53        let drop_caches_cmd = Command::new("sudo")
54            .args(["-n", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
55            .output()
56            .await;
57
58        match drop_caches_cmd {
59            Ok(output) if output.status.success() => {
60                info!("Successfully cleared filesystem caches");
61                Ok(())
62            }
63            Ok(output) => {
64                let stderr = String::from_utf8_lossy(&output.stderr);
65                if stderr.contains("sudo: a password is required") {
66                    warn!("Unable to clear filesystem caches: sudo password required");
67                    warn!(
68                        "For optimal benchmarking, configure passwordless sudo for cache clearing:"
69                    );
70                    warn!("  echo '$USER ALL=(ALL) NOPASSWD: /bin/sh -c echo\\\\ [0-9]\\\\ \\\\>\\\\ /proc/sys/vm/drop_caches' | sudo tee /etc/sudoers.d/drop_caches");
71                    Ok(())
72                } else {
73                    Err(eyre!("Failed to clear filesystem caches: {}", stderr))
74                }
75            }
76            Err(e) => {
77                warn!("Unable to clear filesystem caches: {}", e);
78                Ok(())
79            }
80        }
81    }
82
83    /// Run a warmup benchmark for cache warming
84    pub(crate) async fn run_warmup(&self, from_block: u64) -> Result<()> {
85        let to_block = from_block + self.warmup_blocks;
86        info!(
87            "Running warmup benchmark from block {} to {} ({} blocks)",
88            from_block, to_block, self.warmup_blocks
89        );
90
91        // Build the reth-bench command for warmup (no output flag)
92        let mut cmd = Command::new("reth-bench");
93        cmd.args([
94            "new-payload-fcu",
95            "--rpc-url",
96            &self.rpc_url,
97            "--jwt-secret",
98            &self.jwt_secret,
99            "--from",
100            &from_block.to_string(),
101            "--to",
102            &to_block.to_string(),
103            "--wait-time=0ms", // Warmup should avoid persistence waits.
104        ]);
105
106        cmd.env("RUST_LOG_STYLE", "never")
107            .stdout(std::process::Stdio::piped())
108            .stderr(std::process::Stdio::piped())
109            .kill_on_drop(true);
110
111        // Set process group for consistent signal handling
112        #[cfg(unix)]
113        {
114            cmd.process_group(0);
115        }
116
117        debug!("Executing warmup reth-bench command: {:?}", cmd);
118
119        // Execute the warmup benchmark
120        let mut child = cmd.spawn().wrap_err("Failed to start warmup reth-bench process")?;
121
122        // Stream output at debug level
123        if let Some(stdout) = child.stdout.take() {
124            tokio::spawn(async move {
125                let reader = BufReader::new(stdout);
126                let mut lines = reader.lines();
127                while let Ok(Some(line)) = lines.next_line().await {
128                    debug!("[WARMUP] {}", line);
129                }
130            });
131        }
132
133        if let Some(stderr) = child.stderr.take() {
134            tokio::spawn(async move {
135                let reader = BufReader::new(stderr);
136                let mut lines = reader.lines();
137                while let Ok(Some(line)) = lines.next_line().await {
138                    debug!("[WARMUP] {}", line);
139                }
140            });
141        }
142
143        let status = child.wait().await.wrap_err("Failed to wait for warmup reth-bench")?;
144
145        if !status.success() {
146            return Err(eyre!("Warmup reth-bench failed with exit code: {:?}", status.code()));
147        }
148
149        info!("Warmup completed successfully");
150        Ok(())
151    }
152
153    /// Run a benchmark for the specified block range
154    pub(crate) async fn run_benchmark(
155        &self,
156        from_block: u64,
157        to_block: u64,
158        output_dir: &Path,
159    ) -> Result<()> {
160        info!(
161            "Running benchmark from block {} to {} (output: {:?})",
162            from_block, to_block, output_dir
163        );
164
165        // Ensure output directory exists
166        std::fs::create_dir_all(output_dir)
167            .wrap_err_with(|| format!("Failed to create output directory: {output_dir:?}"))?;
168
169        // Create log file path for reth-bench output
170        let log_file_path = output_dir.join("reth_bench.log");
171        info!("reth-bench logs will be saved to: {:?}", log_file_path);
172
173        // Build the reth-bench command
174        let mut cmd = Command::new("reth-bench");
175        cmd.args([
176            "new-payload-fcu",
177            "--rpc-url",
178            &self.rpc_url,
179            "--jwt-secret",
180            &self.jwt_secret,
181            "--from",
182            &from_block.to_string(),
183            "--to",
184            &to_block.to_string(),
185            "--output",
186            &output_dir.to_string_lossy(),
187        ]);
188
189        // Configure wait mode: both can be used together
190        // When both are set: wait at least wait_time, and also wait for persistence if needed
191        if let Some(ref wait_time) = self.wait_time {
192            cmd.args(["--wait-time", wait_time]);
193        }
194        if self.wait_for_persistence {
195            cmd.arg("--wait-for-persistence");
196
197            // Add persistence threshold if specified
198            if let Some(threshold) = self.persistence_threshold {
199                cmd.args(["--persistence-threshold", &threshold.to_string()]);
200            }
201        }
202
203        cmd.env("RUST_LOG_STYLE", "never")
204            .stdout(std::process::Stdio::piped())
205            .stderr(std::process::Stdio::piped())
206            .kill_on_drop(true);
207
208        // Set process group for consistent signal handling
209        #[cfg(unix)]
210        {
211            cmd.process_group(0);
212        }
213
214        // Debug log the command
215        debug!("Executing reth-bench command: {:?}", cmd);
216
217        // Execute the benchmark
218        let mut child = cmd.spawn().wrap_err("Failed to start reth-bench process")?;
219
220        // Capture stdout and stderr for error reporting
221        let stdout_lines = Arc::new(Mutex::new(Vec::new()));
222        let stderr_lines = Arc::new(Mutex::new(Vec::new()));
223
224        // Stream stdout with prefix at debug level, capture for error reporting, and write to log
225        // file
226        if let Some(stdout) = child.stdout.take() {
227            let stdout_lines_clone = stdout_lines.clone();
228            let log_file = AsyncFile::create(&log_file_path)
229                .await
230                .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
231            tokio::spawn(async move {
232                let reader = BufReader::new(stdout);
233                let mut lines = reader.lines();
234                let mut log_file = log_file;
235                while let Ok(Some(line)) = lines.next_line().await {
236                    debug!("[RETH-BENCH] {}", line);
237                    if let Ok(mut captured) = stdout_lines_clone.lock() {
238                        captured.push(line.clone());
239                    }
240                    // Write to log file (reth-bench output already has timestamps if needed)
241                    let log_line = format!("{}\n", line);
242                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
243                        debug!("Failed to write to log file: {}", e);
244                    }
245                }
246            });
247        }
248
249        // Stream stderr with prefix at debug level, capture for error reporting, and write to log
250        // file
251        if let Some(stderr) = child.stderr.take() {
252            let stderr_lines_clone = stderr_lines.clone();
253            let log_file = AsyncFile::options()
254                .create(true)
255                .append(true)
256                .open(&log_file_path)
257                .await
258                .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
259            tokio::spawn(async move {
260                let reader = BufReader::new(stderr);
261                let mut lines = reader.lines();
262                let mut log_file = log_file;
263                while let Ok(Some(line)) = lines.next_line().await {
264                    debug!("[RETH-BENCH] {}", line);
265                    if let Ok(mut captured) = stderr_lines_clone.lock() {
266                        captured.push(line.clone());
267                    }
268                    // Write to log file (reth-bench output already has timestamps if needed)
269                    let log_line = format!("{}\n", line);
270                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
271                        debug!("Failed to write to log file: {}", e);
272                    }
273                }
274            });
275        }
276
277        let status = child.wait().await.wrap_err("Failed to wait for reth-bench")?;
278
279        if !status.success() {
280            // Print all captured output when command fails
281            error!("reth-bench failed with exit code: {:?}", status.code());
282
283            if let Ok(stdout) = stdout_lines.lock() &&
284                !stdout.is_empty()
285            {
286                error!("reth-bench stdout:");
287                for line in stdout.iter() {
288                    error!("  {}", line);
289                }
290            }
291
292            if let Ok(stderr) = stderr_lines.lock() &&
293                !stderr.is_empty()
294            {
295                error!("reth-bench stderr:");
296                for line in stderr.iter() {
297                    error!("  {}", line);
298                }
299            }
300
301            return Err(eyre!("reth-bench failed with exit code: {:?}", status.code()));
302        }
303
304        info!("Benchmark completed");
305        Ok(())
306    }
307}