reth_bench_compare/
node.rs

1//! Node management for starting, stopping, and controlling reth instances.
2
3use crate::cli::Args;
4use alloy_provider::{Provider, ProviderBuilder};
5use alloy_rpc_types_eth::SyncStatus;
6use eyre::{eyre, OptionExt, Result, WrapErr};
7#[cfg(unix)]
8use nix::sys::signal::{killpg, Signal};
9#[cfg(unix)]
10use nix::unistd::Pid;
11use reth_chainspec::Chain;
12use std::{fs, path::PathBuf, time::Duration};
13use tokio::{
14    fs::File as AsyncFile,
15    io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader},
16    process::Command,
17    time::{sleep, timeout},
18};
19use tracing::{debug, info, warn};
20
21/// Manages reth node lifecycle and operations
22pub(crate) struct NodeManager {
23    datadir: Option<String>,
24    metrics_port: u16,
25    chain: Chain,
26    use_sudo: bool,
27    binary_path: Option<std::path::PathBuf>,
28    enable_profiling: bool,
29    output_dir: PathBuf,
30    additional_reth_args: Vec<String>,
31    comparison_dir: Option<PathBuf>,
32    tracing_endpoint: Option<String>,
33    otlp_max_queue_size: usize,
34}
35
36impl NodeManager {
37    /// Create a new `NodeManager` with configuration from CLI args
38    pub(crate) fn new(args: &Args) -> Self {
39        Self {
40            datadir: Some(args.datadir_path().to_string_lossy().to_string()),
41            metrics_port: args.metrics_port,
42            chain: args.chain,
43            use_sudo: args.sudo,
44            binary_path: None,
45            enable_profiling: args.profile,
46            output_dir: args.output_dir_path(),
47            additional_reth_args: args.reth_args.clone(),
48            comparison_dir: None,
49            tracing_endpoint: args.traces.otlp.as_ref().map(|u| u.to_string()),
50            otlp_max_queue_size: args.otlp_max_queue_size,
51        }
52    }
53
54    /// Set the comparison directory path for logging
55    pub(crate) fn set_comparison_dir(&mut self, dir: PathBuf) {
56        self.comparison_dir = Some(dir);
57    }
58
59    /// Get the log file path for a given reference type
60    fn get_log_file_path(&self, ref_type: &str) -> Result<PathBuf> {
61        let comparison_dir = self
62            .comparison_dir
63            .as_ref()
64            .ok_or_eyre("Comparison directory not set. Call set_comparison_dir first.")?;
65
66        // The comparison directory already contains the full path to results/<timestamp>
67        let log_dir = comparison_dir.join(ref_type);
68
69        // Create the directory if it doesn't exist
70        fs::create_dir_all(&log_dir)
71            .wrap_err(format!("Failed to create log directory: {:?}", log_dir))?;
72
73        let log_file = log_dir.join("reth_node.log");
74        Ok(log_file)
75    }
76
77    /// Get the perf event max sample rate from the system, capped at 10000
78    fn get_perf_sample_rate(&self) -> Option<String> {
79        let perf_rate_file = "/proc/sys/kernel/perf_event_max_sample_rate";
80        if let Ok(content) = fs::read_to_string(perf_rate_file) {
81            let rate_str = content.trim();
82            if !rate_str.is_empty() {
83                if let Ok(system_rate) = rate_str.parse::<u32>() {
84                    let capped_rate = std::cmp::min(system_rate, 10000);
85                    info!(
86                        "Detected perf_event_max_sample_rate: {}, using: {}",
87                        system_rate, capped_rate
88                    );
89                    return Some(capped_rate.to_string());
90                }
91                warn!("Failed to parse perf_event_max_sample_rate: {}", rate_str);
92            }
93        }
94        None
95    }
96
97    /// Get the absolute path to samply using 'which' command
98    async fn get_samply_path(&self) -> Result<String> {
99        let output = Command::new("which")
100            .arg("samply")
101            .output()
102            .await
103            .wrap_err("Failed to execute 'which samply' command")?;
104
105        if !output.status.success() {
106            return Err(eyre!("samply not found in PATH"));
107        }
108
109        let samply_path = String::from_utf8(output.stdout)
110            .wrap_err("samply path is not valid UTF-8")?
111            .trim()
112            .to_string();
113
114        if samply_path.is_empty() {
115            return Err(eyre!("which samply returned empty path"));
116        }
117
118        Ok(samply_path)
119    }
120
121    /// Build reth arguments as a vector of strings
122    fn build_reth_args(
123        &self,
124        binary_path_str: &str,
125        additional_args: &[String],
126        ref_type: &str,
127    ) -> (Vec<String>, String) {
128        let mut reth_args = vec![binary_path_str.to_string(), "node".to_string()];
129
130        // Add chain argument (skip for mainnet as it's the default)
131        let chain_str = self.chain.to_string();
132        if chain_str != "mainnet" {
133            reth_args.extend_from_slice(&["--chain".to_string(), chain_str.clone()]);
134        }
135
136        // Add datadir if specified
137        if let Some(ref datadir) = self.datadir {
138            reth_args.extend_from_slice(&["--datadir".to_string(), datadir.clone()]);
139        }
140
141        // Add reth-specific arguments
142        let metrics_arg = format!("0.0.0.0:{}", self.metrics_port);
143        reth_args.extend_from_slice(&[
144            "--engine.accept-execution-requests-hash".to_string(),
145            "--metrics".to_string(),
146            metrics_arg,
147            "--http".to_string(),
148            "--http.api".to_string(),
149            "eth".to_string(),
150            "--disable-discovery".to_string(),
151            "--trusted-only".to_string(),
152        ]);
153
154        // Add tracing arguments if OTLP endpoint is configured
155        if let Some(ref endpoint) = self.tracing_endpoint {
156            info!("Enabling OTLP tracing export to: {} (service: reth-{})", endpoint, ref_type);
157            // Endpoint requires equals per clap settings in reth
158            reth_args.push(format!("--tracing-otlp={}", endpoint));
159        }
160
161        // Add any additional arguments passed via command line (common to both baseline and
162        // feature)
163        reth_args.extend_from_slice(&self.additional_reth_args);
164
165        // Add reference-specific additional arguments
166        reth_args.extend_from_slice(additional_args);
167
168        (reth_args, chain_str)
169    }
170
171    /// Create a command for profiling mode
172    async fn create_profiling_command(
173        &self,
174        ref_type: &str,
175        reth_args: &[String],
176    ) -> Result<Command> {
177        // Create profiles directory if it doesn't exist
178        let profile_dir = self.output_dir.join("profiles");
179        fs::create_dir_all(&profile_dir).wrap_err("Failed to create profiles directory")?;
180
181        let profile_path = profile_dir.join(format!("{}.json.gz", ref_type));
182        info!("Starting reth node with samply profiling...");
183        info!("Profile output: {:?}", profile_path);
184
185        // Get absolute path to samply
186        let samply_path = self.get_samply_path().await?;
187
188        let mut cmd = if self.use_sudo {
189            let mut sudo_cmd = Command::new("sudo");
190            sudo_cmd.arg(&samply_path);
191            sudo_cmd
192        } else {
193            Command::new(&samply_path)
194        };
195
196        // Add samply arguments
197        cmd.args(["record", "--save-only", "-o", &profile_path.to_string_lossy()]);
198
199        // Add rate argument if available
200        if let Some(rate) = self.get_perf_sample_rate() {
201            cmd.args(["--rate", &rate]);
202        }
203
204        // Add separator and complete reth command
205        cmd.arg("--");
206        cmd.args(reth_args);
207
208        // Set environment variable to disable log styling
209        cmd.env("RUST_LOG_STYLE", "never");
210
211        Ok(cmd)
212    }
213
214    /// Create a command for direct reth execution
215    fn create_direct_command(&self, reth_args: &[String]) -> Command {
216        let binary_path = &reth_args[0];
217
218        let mut cmd = if self.use_sudo {
219            info!("Starting reth node with sudo...");
220            let mut sudo_cmd = Command::new("sudo");
221            sudo_cmd.args(reth_args);
222            sudo_cmd
223        } else {
224            info!("Starting reth node...");
225            let mut reth_cmd = Command::new(binary_path);
226            reth_cmd.args(&reth_args[1..]); // Skip the binary path since it's the command
227            reth_cmd
228        };
229
230        // Set environment variable to disable log styling
231        cmd.env("RUST_LOG_STYLE", "never");
232
233        cmd
234    }
235
236    /// Start a reth node using the specified binary path and return the process handle
237    pub(crate) async fn start_node(
238        &mut self,
239        binary_path: &std::path::Path,
240        _git_ref: &str,
241        ref_type: &str,
242        additional_args: &[String],
243    ) -> Result<tokio::process::Child> {
244        // Store the binary path for later use (e.g., in unwind_to_block)
245        self.binary_path = Some(binary_path.to_path_buf());
246
247        let binary_path_str = binary_path.to_string_lossy();
248        let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
249
250        // Log additional arguments if any
251        if !self.additional_reth_args.is_empty() {
252            info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
253        }
254        if !additional_args.is_empty() {
255            info!("Using reference-specific additional reth arguments: {:?}", additional_args);
256        }
257
258        let mut cmd = if self.enable_profiling {
259            self.create_profiling_command(ref_type, &reth_args).await?
260        } else {
261            self.create_direct_command(&reth_args)
262        };
263
264        // Set process group for better signal handling
265        #[cfg(unix)]
266        {
267            cmd.process_group(0);
268        }
269
270        // Set high queue size to prevent trace dropping during benchmarks
271        if self.tracing_endpoint.is_some() {
272            cmd.env("OTEL_BSP_MAX_QUEUE_SIZE", self.otlp_max_queue_size.to_string()); // Traces
273            cmd.env("OTEL_BLRP_MAX_QUEUE_SIZE", "10000"); // Logs
274
275            // Set service name to differentiate baseline vs feature runs in Jaeger
276            cmd.env("OTEL_SERVICE_NAME", format!("reth-{}", ref_type));
277        }
278
279        debug!("Executing reth command: {cmd:?}");
280
281        let mut child = cmd
282            .stdout(std::process::Stdio::piped())
283            .stderr(std::process::Stdio::piped())
284            .kill_on_drop(true) // Kill on drop so that on Ctrl-C for parent process we stop all child processes
285            .spawn()
286            .wrap_err("Failed to start reth node")?;
287
288        info!(
289            "Reth node started with PID: {:?} (binary: {})",
290            child.id().ok_or_eyre("Reth node is not running")?,
291            binary_path_str
292        );
293
294        // Prepare log file path
295        let log_file_path = self.get_log_file_path(ref_type)?;
296        info!("Reth node logs will be saved to: {:?}", log_file_path);
297
298        // Stream stdout and stderr with prefixes at debug level and to log file
299        if let Some(stdout) = child.stdout.take() {
300            let log_file = AsyncFile::create(&log_file_path)
301                .await
302                .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
303            tokio::spawn(async move {
304                let reader = AsyncBufReader::new(stdout);
305                let mut lines = reader.lines();
306                let mut log_file = log_file;
307                while let Ok(Some(line)) = lines.next_line().await {
308                    debug!("[RETH] {}", line);
309                    // Write to log file (reth already includes timestamps)
310                    let log_line = format!("{}\n", line);
311                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
312                        debug!("Failed to write to log file: {}", e);
313                    }
314                }
315            });
316        }
317
318        if let Some(stderr) = child.stderr.take() {
319            let log_file = AsyncFile::options()
320                .create(true)
321                .append(true)
322                .open(&log_file_path)
323                .await
324                .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
325            tokio::spawn(async move {
326                let reader = AsyncBufReader::new(stderr);
327                let mut lines = reader.lines();
328                let mut log_file = log_file;
329                while let Ok(Some(line)) = lines.next_line().await {
330                    debug!("[RETH] {}", line);
331                    // Write to log file (reth already includes timestamps)
332                    let log_line = format!("{}\n", line);
333                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
334                        debug!("Failed to write to log file: {}", e);
335                    }
336                }
337            });
338        }
339
340        // Give the node a moment to start up
341        sleep(Duration::from_secs(5)).await;
342
343        Ok(child)
344    }
345
346    /// Wait for the node to be ready and return its current tip
347    pub(crate) async fn wait_for_node_ready_and_get_tip(&self) -> Result<u64> {
348        info!("Waiting for node to be ready and synced...");
349
350        let max_wait = Duration::from_secs(120); // 2 minutes to allow for sync
351        let check_interval = Duration::from_secs(2);
352        let rpc_url = "http://localhost:8545";
353
354        // Create Alloy provider
355        let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
356        let provider = ProviderBuilder::new().connect_http(url);
357
358        timeout(max_wait, async {
359            loop {
360                // First check if RPC is up and node is not syncing
361                match provider.syncing().await {
362                    Ok(sync_result) => {
363                        match sync_result {
364                            SyncStatus::Info(sync_info) => {
365                                debug!("Node is still syncing {sync_info:?}, waiting...");
366                            }
367                            _ => {
368                                // Node is not syncing, now get the tip
369                                match provider.get_block_number().await {
370                                    Ok(tip) => {
371                                        info!("Node is ready and not syncing at block: {}", tip);
372                                        return Ok(tip);
373                                    }
374                                    Err(e) => {
375                                        debug!("Failed to get block number: {}", e);
376                                    }
377                                }
378                            }
379                        }
380                    }
381                    Err(e) => {
382                        debug!("Node RPC not ready yet or failed to check sync status: {}", e);
383                    }
384                }
385
386                sleep(check_interval).await;
387            }
388        })
389        .await
390        .wrap_err("Timed out waiting for node to be ready and synced")?
391    }
392
393    /// Stop the reth node gracefully
394    pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
395        let pid = child.id().expect("Child process ID should be available");
396
397        // Check if the process has already exited
398        match child.try_wait() {
399            Ok(Some(status)) => {
400                info!("Reth node (PID: {}) has already exited with status: {:?}", pid, status);
401                return Ok(());
402            }
403            Ok(None) => {
404                // Process is still running, proceed to stop it
405                info!("Stopping process gracefully with SIGINT (PID: {})...", pid);
406            }
407            Err(e) => {
408                return Err(eyre!("Failed to check process status: {}", e));
409            }
410        }
411
412        #[cfg(unix)]
413        {
414            // Send SIGINT to process group to mimic Ctrl-C behavior
415            let nix_pgid = Pid::from_raw(pid as i32);
416
417            match killpg(nix_pgid, Signal::SIGINT) {
418                Ok(()) => {}
419                Err(nix::errno::Errno::ESRCH) => {
420                    info!("Process group {} has already exited", pid);
421                }
422                Err(e) => {
423                    return Err(eyre!("Failed to send SIGINT to process group {}: {}", pid, e));
424                }
425            }
426        }
427
428        #[cfg(not(unix))]
429        {
430            // On non-Unix systems, fall back to using external kill command
431            let output = Command::new("taskkill")
432                .args(["/PID", &pid.to_string(), "/F"])
433                .output()
434                .await
435                .wrap_err("Failed to execute taskkill command")?;
436
437            if !output.status.success() {
438                let stderr = String::from_utf8_lossy(&output.stderr);
439                // Check if the error is because the process doesn't exist
440                if stderr.contains("not found") || stderr.contains("not exist") {
441                    info!("Process {} has already exited", pid);
442                } else {
443                    return Err(eyre!("Failed to kill process {}: {}", pid, stderr));
444                }
445            }
446        }
447
448        // Wait for the process to exit
449        match child.wait().await {
450            Ok(status) => {
451                info!("Reth node (PID: {}) exited with status: {:?}", pid, status);
452            }
453            Err(e) => {
454                // If we get an error here, it might be because the process already exited
455                debug!("Error waiting for process exit (may have already exited): {}", e);
456            }
457        }
458
459        Ok(())
460    }
461
462    /// Unwind the node to a specific block
463    pub(crate) async fn unwind_to_block(&self, block_number: u64) -> Result<()> {
464        if self.use_sudo {
465            info!("Unwinding node to block: {} (with sudo)", block_number);
466        } else {
467            info!("Unwinding node to block: {}", block_number);
468        }
469
470        // Use the binary path from the last start_node call, or fallback to default
471        let binary_path = self
472            .binary_path
473            .as_ref()
474            .map(|p| p.to_string_lossy().to_string())
475            .unwrap_or_else(|| "./target/profiling/reth".to_string());
476
477        let mut cmd = if self.use_sudo {
478            let mut sudo_cmd = Command::new("sudo");
479            sudo_cmd.args([&binary_path, "stage", "unwind"]);
480            sudo_cmd
481        } else {
482            let mut reth_cmd = Command::new(&binary_path);
483            reth_cmd.args(["stage", "unwind"]);
484            reth_cmd
485        };
486
487        // Add chain argument (skip for mainnet as it's the default)
488        let chain_str = self.chain.to_string();
489        if chain_str != "mainnet" {
490            cmd.args(["--chain", &chain_str]);
491        }
492
493        // Add datadir if specified
494        if let Some(ref datadir) = self.datadir {
495            cmd.args(["--datadir", datadir]);
496        }
497
498        cmd.args(["to-block", &block_number.to_string()]);
499
500        // Set environment variable to disable log styling
501        cmd.env("RUST_LOG_STYLE", "never");
502
503        // Debug log the command
504        debug!("Executing reth unwind command: {:?}", cmd);
505
506        let mut child = cmd
507            .stdout(std::process::Stdio::piped())
508            .stderr(std::process::Stdio::piped())
509            .spawn()
510            .wrap_err("Failed to start unwind command")?;
511
512        // Stream stdout and stderr with prefixes in real-time
513        if let Some(stdout) = child.stdout.take() {
514            tokio::spawn(async move {
515                let reader = AsyncBufReader::new(stdout);
516                let mut lines = reader.lines();
517                while let Ok(Some(line)) = lines.next_line().await {
518                    debug!("[RETH-UNWIND] {}", line);
519                }
520            });
521        }
522
523        if let Some(stderr) = child.stderr.take() {
524            tokio::spawn(async move {
525                let reader = AsyncBufReader::new(stderr);
526                let mut lines = reader.lines();
527                while let Ok(Some(line)) = lines.next_line().await {
528                    debug!("[RETH-UNWIND] {}", line);
529                }
530            });
531        }
532
533        // Wait for the command to complete
534        let status = child.wait().await.wrap_err("Failed to wait for unwind command")?;
535
536        if !status.success() {
537            return Err(eyre!("Unwind command failed with exit code: {:?}", status.code()));
538        }
539
540        info!("Unwound to block: {}", block_number);
541        Ok(())
542    }
543}