reth_bench_compare/
node.rs

1//! Node management for starting, stopping, and controlling reth instances.
2
3use crate::cli::Args;
4use alloy_provider::{Provider, ProviderBuilder};
5use alloy_rpc_types_eth::SyncStatus;
6use eyre::{eyre, OptionExt, Result, WrapErr};
7#[cfg(unix)]
8use nix::sys::signal::{killpg, Signal};
9#[cfg(unix)]
10use nix::unistd::Pid;
11use reth_chainspec::Chain;
12use std::{fs, path::PathBuf, time::Duration};
13use tokio::{
14    fs::File as AsyncFile,
15    io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader},
16    process::Command,
17    time::{sleep, timeout},
18};
19use tracing::{debug, info, warn};
20
21/// Manages reth node lifecycle and operations
22pub(crate) struct NodeManager {
23    datadir: Option<String>,
24    metrics_port: u16,
25    chain: Chain,
26    use_sudo: bool,
27    binary_path: Option<std::path::PathBuf>,
28    enable_profiling: bool,
29    output_dir: PathBuf,
30    additional_reth_args: Vec<String>,
31    comparison_dir: Option<PathBuf>,
32    tracing_endpoint: Option<String>,
33    otlp_max_queue_size: usize,
34}
35
36impl NodeManager {
37    /// Create a new `NodeManager` with configuration from CLI args
38    pub(crate) fn new(args: &Args) -> Self {
39        Self {
40            datadir: Some(args.datadir_path().to_string_lossy().to_string()),
41            metrics_port: args.metrics_port,
42            chain: args.chain,
43            use_sudo: args.sudo,
44            binary_path: None,
45            enable_profiling: args.profile,
46            output_dir: args.output_dir_path(),
47            // Filter out empty strings to prevent invalid arguments being passed to reth node
48            additional_reth_args: args
49                .reth_args
50                .iter()
51                .filter(|s| !s.is_empty())
52                .cloned()
53                .collect(),
54            comparison_dir: None,
55            tracing_endpoint: args.traces.otlp.as_ref().map(|u| u.to_string()),
56            otlp_max_queue_size: args.otlp_max_queue_size,
57        }
58    }
59
60    /// Set the comparison directory path for logging
61    pub(crate) fn set_comparison_dir(&mut self, dir: PathBuf) {
62        self.comparison_dir = Some(dir);
63    }
64
65    /// Get the log file path for a given reference type
66    fn get_log_file_path(&self, ref_type: &str) -> Result<PathBuf> {
67        let comparison_dir = self
68            .comparison_dir
69            .as_ref()
70            .ok_or_eyre("Comparison directory not set. Call set_comparison_dir first.")?;
71
72        // The comparison directory already contains the full path to results/<timestamp>
73        let log_dir = comparison_dir.join(ref_type);
74
75        // Create the directory if it doesn't exist
76        fs::create_dir_all(&log_dir)
77            .wrap_err(format!("Failed to create log directory: {:?}", log_dir))?;
78
79        let log_file = log_dir.join("reth_node.log");
80        Ok(log_file)
81    }
82
83    /// Get the perf event max sample rate from the system, capped at 10000
84    fn get_perf_sample_rate(&self) -> Option<String> {
85        let perf_rate_file = "/proc/sys/kernel/perf_event_max_sample_rate";
86        if let Ok(content) = fs::read_to_string(perf_rate_file) {
87            let rate_str = content.trim();
88            if !rate_str.is_empty() {
89                if let Ok(system_rate) = rate_str.parse::<u32>() {
90                    let capped_rate = std::cmp::min(system_rate, 10000);
91                    info!(
92                        "Detected perf_event_max_sample_rate: {}, using: {}",
93                        system_rate, capped_rate
94                    );
95                    return Some(capped_rate.to_string());
96                }
97                warn!("Failed to parse perf_event_max_sample_rate: {}", rate_str);
98            }
99        }
100        None
101    }
102
103    /// Get the absolute path to samply using 'which' command
104    async fn get_samply_path(&self) -> Result<String> {
105        let output = Command::new("which")
106            .arg("samply")
107            .output()
108            .await
109            .wrap_err("Failed to execute 'which samply' command")?;
110
111        if !output.status.success() {
112            return Err(eyre!("samply not found in PATH"));
113        }
114
115        let samply_path = String::from_utf8(output.stdout)
116            .wrap_err("samply path is not valid UTF-8")?
117            .trim()
118            .to_string();
119
120        if samply_path.is_empty() {
121            return Err(eyre!("which samply returned empty path"));
122        }
123
124        Ok(samply_path)
125    }
126
127    /// Build reth arguments as a vector of strings
128    fn build_reth_args(
129        &self,
130        binary_path_str: &str,
131        additional_args: &[String],
132        ref_type: &str,
133    ) -> (Vec<String>, String) {
134        let mut reth_args = vec![binary_path_str.to_string(), "node".to_string()];
135
136        // Add chain argument (skip for mainnet as it's the default)
137        let chain_str = self.chain.to_string();
138        if chain_str != "mainnet" {
139            reth_args.extend_from_slice(&["--chain".to_string(), chain_str.clone()]);
140        }
141
142        // Add datadir if specified
143        if let Some(ref datadir) = self.datadir {
144            reth_args.extend_from_slice(&["--datadir".to_string(), datadir.clone()]);
145        }
146
147        // Add reth-specific arguments
148        let metrics_arg = format!("0.0.0.0:{}", self.metrics_port);
149        reth_args.extend_from_slice(&[
150            "--engine.accept-execution-requests-hash".to_string(),
151            "--metrics".to_string(),
152            metrics_arg,
153            "--http".to_string(),
154            "--http.api".to_string(),
155            "eth".to_string(),
156            "--disable-discovery".to_string(),
157            "--trusted-only".to_string(),
158        ]);
159
160        // Add tracing arguments if OTLP endpoint is configured
161        if let Some(ref endpoint) = self.tracing_endpoint {
162            info!("Enabling OTLP tracing export to: {} (service: reth-{})", endpoint, ref_type);
163            // Endpoint requires equals per clap settings in reth
164            reth_args.push(format!("--tracing-otlp={}", endpoint));
165        }
166
167        // Add any additional arguments passed via command line (common to both baseline and
168        // feature)
169        reth_args.extend_from_slice(&self.additional_reth_args);
170
171        // Add reference-specific additional arguments
172        reth_args.extend_from_slice(additional_args);
173
174        (reth_args, chain_str)
175    }
176
177    /// Create a command for profiling mode
178    async fn create_profiling_command(
179        &self,
180        ref_type: &str,
181        reth_args: &[String],
182    ) -> Result<Command> {
183        // Create profiles directory if it doesn't exist
184        let profile_dir = self.output_dir.join("profiles");
185        fs::create_dir_all(&profile_dir).wrap_err("Failed to create profiles directory")?;
186
187        let profile_path = profile_dir.join(format!("{}.json.gz", ref_type));
188        info!("Starting reth node with samply profiling...");
189        info!("Profile output: {:?}", profile_path);
190
191        // Get absolute path to samply
192        let samply_path = self.get_samply_path().await?;
193
194        let mut cmd = if self.use_sudo {
195            let mut sudo_cmd = Command::new("sudo");
196            sudo_cmd.arg(&samply_path);
197            sudo_cmd
198        } else {
199            Command::new(&samply_path)
200        };
201
202        // Add samply arguments
203        cmd.args(["record", "--save-only", "-o", &profile_path.to_string_lossy()]);
204
205        // Add rate argument if available
206        if let Some(rate) = self.get_perf_sample_rate() {
207            cmd.args(["--rate", &rate]);
208        }
209
210        // Add separator and complete reth command
211        cmd.arg("--");
212        cmd.args(reth_args);
213
214        // Enable tracing-samply
215        if supports_samply_flags(&reth_args[0]) {
216            cmd.arg("--log.samply");
217        }
218
219        // Set environment variable to disable log styling
220        cmd.env("RUST_LOG_STYLE", "never");
221
222        Ok(cmd)
223    }
224
225    /// Create a command for direct reth execution
226    fn create_direct_command(&self, reth_args: &[String]) -> Command {
227        let binary_path = &reth_args[0];
228
229        let mut cmd = if self.use_sudo {
230            info!("Starting reth node with sudo...");
231            let mut sudo_cmd = Command::new("sudo");
232            sudo_cmd.args(reth_args);
233            sudo_cmd
234        } else {
235            info!("Starting reth node...");
236            let mut reth_cmd = Command::new(binary_path);
237            reth_cmd.args(&reth_args[1..]); // Skip the binary path since it's the command
238            reth_cmd
239        };
240
241        // Set environment variable to disable log styling
242        cmd.env("RUST_LOG_STYLE", "never");
243
244        cmd
245    }
246
247    /// Start a reth node using the specified binary path and return the process handle
248    /// along with the formatted reth command string for reporting.
249    pub(crate) async fn start_node(
250        &mut self,
251        binary_path: &std::path::Path,
252        _git_ref: &str,
253        ref_type: &str,
254        additional_args: &[String],
255    ) -> Result<(tokio::process::Child, String)> {
256        // Store the binary path for later use (e.g., in unwind_to_block)
257        self.binary_path = Some(binary_path.to_path_buf());
258
259        let binary_path_str = binary_path.to_string_lossy();
260        let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
261
262        // Format the reth command string for reporting
263        let reth_command = shlex::try_join(reth_args.iter().map(|s| s.as_str()))
264            .wrap_err("Failed to format reth command string")?;
265
266        // Log additional arguments if any
267        if !self.additional_reth_args.is_empty() {
268            info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
269        }
270        if !additional_args.is_empty() {
271            info!("Using reference-specific additional reth arguments: {:?}", additional_args);
272        }
273
274        let mut cmd = if self.enable_profiling {
275            self.create_profiling_command(ref_type, &reth_args).await?
276        } else {
277            self.create_direct_command(&reth_args)
278        };
279
280        // Set process group for better signal handling
281        #[cfg(unix)]
282        {
283            cmd.process_group(0);
284        }
285
286        // Set high queue size to prevent trace dropping during benchmarks
287        if self.tracing_endpoint.is_some() {
288            cmd.env("OTEL_BSP_MAX_QUEUE_SIZE", self.otlp_max_queue_size.to_string()); // Traces
289            cmd.env("OTEL_BLRP_MAX_QUEUE_SIZE", "10000"); // Logs
290
291            // Set service name to differentiate baseline vs feature runs in Jaeger
292            cmd.env("OTEL_SERVICE_NAME", format!("reth-{}", ref_type));
293        }
294
295        debug!("Executing reth command: {cmd:?}");
296
297        let mut child = cmd
298            .stdout(std::process::Stdio::piped())
299            .stderr(std::process::Stdio::piped())
300            .kill_on_drop(true) // Kill on drop so that on Ctrl-C for parent process we stop all child processes
301            .spawn()
302            .wrap_err("Failed to start reth node")?;
303
304        info!(
305            "Reth node started with PID: {:?} (binary: {})",
306            child.id().ok_or_eyre("Reth node is not running")?,
307            binary_path_str
308        );
309
310        // Prepare log file path
311        let log_file_path = self.get_log_file_path(ref_type)?;
312        info!("Reth node logs will be saved to: {:?}", log_file_path);
313
314        // Stream stdout and stderr with prefixes at debug level and to log file
315        if let Some(stdout) = child.stdout.take() {
316            let log_file = AsyncFile::create(&log_file_path)
317                .await
318                .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
319            tokio::spawn(async move {
320                let reader = AsyncBufReader::new(stdout);
321                let mut lines = reader.lines();
322                let mut log_file = log_file;
323                while let Ok(Some(line)) = lines.next_line().await {
324                    debug!("[RETH] {}", line);
325                    // Write to log file (reth already includes timestamps)
326                    let log_line = format!("{}\n", line);
327                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
328                        debug!("Failed to write to log file: {}", e);
329                    }
330                }
331            });
332        }
333
334        if let Some(stderr) = child.stderr.take() {
335            let log_file = AsyncFile::options()
336                .create(true)
337                .append(true)
338                .open(&log_file_path)
339                .await
340                .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
341            tokio::spawn(async move {
342                let reader = AsyncBufReader::new(stderr);
343                let mut lines = reader.lines();
344                let mut log_file = log_file;
345                while let Ok(Some(line)) = lines.next_line().await {
346                    debug!("[RETH] {}", line);
347                    // Write to log file (reth already includes timestamps)
348                    let log_line = format!("{}\n", line);
349                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
350                        debug!("Failed to write to log file: {}", e);
351                    }
352                }
353            });
354        }
355
356        // Give the node a moment to start up
357        sleep(Duration::from_secs(5)).await;
358
359        Ok((child, reth_command))
360    }
361
362    /// Wait for the node to be ready and return its current tip
363    pub(crate) async fn wait_for_node_ready_and_get_tip(&self) -> Result<u64> {
364        info!("Waiting for node to be ready and synced...");
365
366        let max_wait = Duration::from_secs(120); // 2 minutes to allow for sync
367        let check_interval = Duration::from_secs(2);
368        let rpc_url = "http://localhost:8545";
369
370        // Create Alloy provider
371        let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
372        let provider = ProviderBuilder::new().connect_http(url);
373
374        timeout(max_wait, async {
375            loop {
376                // First check if RPC is up and node is not syncing
377                match provider.syncing().await {
378                    Ok(sync_result) => {
379                        match sync_result {
380                            SyncStatus::Info(sync_info) => {
381                                debug!("Node is still syncing {sync_info:?}, waiting...");
382                            }
383                            _ => {
384                                // Node is not syncing, now get the tip
385                                match provider.get_block_number().await {
386                                    Ok(tip) => {
387                                        info!("Node is ready and not syncing at block: {}", tip);
388                                        return Ok(tip);
389                                    }
390                                    Err(e) => {
391                                        debug!("Failed to get block number: {}", e);
392                                    }
393                                }
394                            }
395                        }
396                    }
397                    Err(e) => {
398                        debug!("Node RPC not ready yet or failed to check sync status: {}", e);
399                    }
400                }
401
402                sleep(check_interval).await;
403            }
404        })
405        .await
406        .wrap_err("Timed out waiting for node to be ready and synced")?
407    }
408
409    /// Stop the reth node gracefully
410    pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
411        let pid = child.id().expect("Child process ID should be available");
412
413        // Check if the process has already exited
414        match child.try_wait() {
415            Ok(Some(status)) => {
416                info!("Reth node (PID: {}) has already exited with status: {:?}", pid, status);
417                return Ok(());
418            }
419            Ok(None) => {
420                // Process is still running, proceed to stop it
421                info!("Stopping process gracefully with SIGINT (PID: {})...", pid);
422            }
423            Err(e) => {
424                return Err(eyre!("Failed to check process status: {}", e));
425            }
426        }
427
428        #[cfg(unix)]
429        {
430            // Send SIGINT to process group to mimic Ctrl-C behavior
431            let nix_pgid = Pid::from_raw(pid as i32);
432
433            match killpg(nix_pgid, Signal::SIGINT) {
434                Ok(()) => {}
435                Err(nix::errno::Errno::ESRCH) => {
436                    info!("Process group {} has already exited", pid);
437                }
438                Err(e) => {
439                    return Err(eyre!("Failed to send SIGINT to process group {}: {}", pid, e));
440                }
441            }
442        }
443
444        #[cfg(not(unix))]
445        {
446            // On non-Unix systems, fall back to using external kill command
447            let output = Command::new("taskkill")
448                .args(["/PID", &pid.to_string(), "/F"])
449                .output()
450                .await
451                .wrap_err("Failed to execute taskkill command")?;
452
453            if !output.status.success() {
454                let stderr = String::from_utf8_lossy(&output.stderr);
455                // Check if the error is because the process doesn't exist
456                if stderr.contains("not found") || stderr.contains("not exist") {
457                    info!("Process {} has already exited", pid);
458                } else {
459                    return Err(eyre!("Failed to kill process {}: {}", pid, stderr));
460                }
461            }
462        }
463
464        // Wait for the process to exit
465        match child.wait().await {
466            Ok(status) => {
467                info!("Reth node (PID: {}) exited with status: {:?}", pid, status);
468            }
469            Err(e) => {
470                // If we get an error here, it might be because the process already exited
471                debug!("Error waiting for process exit (may have already exited): {}", e);
472            }
473        }
474
475        Ok(())
476    }
477
478    /// Unwind the node to a specific block
479    pub(crate) async fn unwind_to_block(&self, block_number: u64) -> Result<()> {
480        if self.use_sudo {
481            info!("Unwinding node to block: {} (with sudo)", block_number);
482        } else {
483            info!("Unwinding node to block: {}", block_number);
484        }
485
486        // Use the binary path from the last start_node call, or fallback to default
487        let binary_path = self
488            .binary_path
489            .as_ref()
490            .map(|p| p.to_string_lossy().to_string())
491            .unwrap_or_else(|| "./target/profiling/reth".to_string());
492
493        let mut cmd = if self.use_sudo {
494            let mut sudo_cmd = Command::new("sudo");
495            sudo_cmd.args([&binary_path, "stage", "unwind"]);
496            sudo_cmd
497        } else {
498            let mut reth_cmd = Command::new(&binary_path);
499            reth_cmd.args(["stage", "unwind"]);
500            reth_cmd
501        };
502
503        // Add chain argument (skip for mainnet as it's the default)
504        let chain_str = self.chain.to_string();
505        if chain_str != "mainnet" {
506            cmd.args(["--chain", &chain_str]);
507        }
508
509        // Add datadir if specified
510        if let Some(ref datadir) = self.datadir {
511            cmd.args(["--datadir", datadir]);
512        }
513
514        cmd.args(["to-block", &block_number.to_string()]);
515
516        // Set environment variable to disable log styling
517        cmd.env("RUST_LOG_STYLE", "never");
518
519        // Debug log the command
520        debug!("Executing reth unwind command: {:?}", cmd);
521
522        let mut child = cmd
523            .stdout(std::process::Stdio::piped())
524            .stderr(std::process::Stdio::piped())
525            .spawn()
526            .wrap_err("Failed to start unwind command")?;
527
528        // Stream stdout and stderr with prefixes in real-time
529        if let Some(stdout) = child.stdout.take() {
530            tokio::spawn(async move {
531                let reader = AsyncBufReader::new(stdout);
532                let mut lines = reader.lines();
533                while let Ok(Some(line)) = lines.next_line().await {
534                    debug!("[RETH-UNWIND] {}", line);
535                }
536            });
537        }
538
539        if let Some(stderr) = child.stderr.take() {
540            tokio::spawn(async move {
541                let reader = AsyncBufReader::new(stderr);
542                let mut lines = reader.lines();
543                while let Ok(Some(line)) = lines.next_line().await {
544                    debug!("[RETH-UNWIND] {}", line);
545                }
546            });
547        }
548
549        // Wait for the command to complete
550        let status = child.wait().await.wrap_err("Failed to wait for unwind command")?;
551
552        if !status.success() {
553            return Err(eyre!("Unwind command failed with exit code: {:?}", status.code()));
554        }
555
556        info!("Unwound to block: {}", block_number);
557        Ok(())
558    }
559}
560
561fn supports_samply_flags(bin: &str) -> bool {
562    let mut cmd = std::process::Command::new(bin);
563    // NOTE: The flag to check must come before --help.
564    // We pass --help as a shortcut to not execute any command.
565    cmd.args(["--log.samply", "--help"]);
566    debug!(?cmd, "Checking samply flags support");
567    let Ok(output) = cmd.output() else {
568        return false;
569    };
570    debug!(?output, "Samply flags support check");
571    output.status.success()
572}