Skip to main content

reth_bench_compare/
node.rs

1//! Node management for starting, stopping, and controlling reth instances.
2
3use crate::cli::Args;
4use alloy_provider::{Provider, ProviderBuilder};
5use alloy_rpc_client::RpcClient;
6use alloy_rpc_types_eth::SyncStatus;
7use alloy_transport_ws::WsConnect;
8use eyre::{eyre, OptionExt, Result, WrapErr};
9#[cfg(unix)]
10use nix::sys::signal::{killpg, Signal};
11#[cfg(unix)]
12use nix::unistd::Pid;
13use reth_chainspec::Chain;
14use std::{fs, path::PathBuf, time::Duration};
15use tokio::{
16    fs::File as AsyncFile,
17    io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader},
18    process::Command,
19    time::{sleep, timeout},
20};
21use tracing::{debug, info, warn};
22
23/// Default websocket RPC port used by reth
24const DEFAULT_WS_RPC_PORT: u16 = 8546;
25
26/// Manages reth node lifecycle and operations
27pub(crate) struct NodeManager {
28    datadir: Option<String>,
29    metrics_port: u16,
30    chain: Chain,
31    use_sudo: bool,
32    binary_path: Option<std::path::PathBuf>,
33    enable_profiling: bool,
34    output_dir: PathBuf,
35    additional_reth_args: Vec<String>,
36    comparison_dir: Option<PathBuf>,
37    tracing_endpoint: Option<String>,
38    otlp_max_queue_size: usize,
39}
40
41impl NodeManager {
42    /// Create a new `NodeManager` with configuration from CLI args
43    pub(crate) fn new(args: &Args) -> Self {
44        Self {
45            datadir: Some(args.datadir_path().to_string_lossy().to_string()),
46            metrics_port: args.metrics_port,
47            chain: args.chain,
48            use_sudo: args.sudo,
49            binary_path: None,
50            enable_profiling: args.profile,
51            output_dir: args.output_dir_path(),
52            // Filter out empty strings to prevent invalid arguments being passed to reth node
53            additional_reth_args: args
54                .reth_args
55                .iter()
56                .filter(|s| !s.is_empty())
57                .cloned()
58                .collect(),
59            comparison_dir: None,
60            tracing_endpoint: args.traces.otlp.as_ref().map(|u| u.to_string()),
61            otlp_max_queue_size: args.otlp_max_queue_size,
62        }
63    }
64
65    /// Set the comparison directory path for logging
66    pub(crate) fn set_comparison_dir(&mut self, dir: PathBuf) {
67        self.comparison_dir = Some(dir);
68    }
69
70    /// Get the log file path for a given reference type
71    fn get_log_file_path(&self, ref_type: &str) -> Result<PathBuf> {
72        let comparison_dir = self
73            .comparison_dir
74            .as_ref()
75            .ok_or_eyre("Comparison directory not set. Call set_comparison_dir first.")?;
76
77        // The comparison directory already contains the full path to results/<timestamp>
78        let log_dir = comparison_dir.join(ref_type);
79
80        // Create the directory if it doesn't exist
81        fs::create_dir_all(&log_dir)
82            .wrap_err(format!("Failed to create log directory: {:?}", log_dir))?;
83
84        let log_file = log_dir.join("reth_node.log");
85        Ok(log_file)
86    }
87
88    /// Get the perf event max sample rate from the system, capped at 10000
89    fn get_perf_sample_rate(&self) -> Option<String> {
90        let perf_rate_file = "/proc/sys/kernel/perf_event_max_sample_rate";
91        if let Ok(content) = fs::read_to_string(perf_rate_file) {
92            let rate_str = content.trim();
93            if !rate_str.is_empty() {
94                if let Ok(system_rate) = rate_str.parse::<u32>() {
95                    let capped_rate = std::cmp::min(system_rate, 10000);
96                    info!(
97                        "Detected perf_event_max_sample_rate: {}, using: {}",
98                        system_rate, capped_rate
99                    );
100                    return Some(capped_rate.to_string());
101                }
102                warn!("Failed to parse perf_event_max_sample_rate: {}", rate_str);
103            }
104        }
105        None
106    }
107
108    /// Get the absolute path to samply using 'which' command
109    async fn get_samply_path(&self) -> Result<String> {
110        let output = Command::new("which")
111            .arg("samply")
112            .output()
113            .await
114            .wrap_err("Failed to execute 'which samply' command")?;
115
116        if !output.status.success() {
117            return Err(eyre!("samply not found in PATH"));
118        }
119
120        let samply_path = String::from_utf8(output.stdout)
121            .wrap_err("samply path is not valid UTF-8")?
122            .trim()
123            .to_string();
124
125        if samply_path.is_empty() {
126            return Err(eyre!("which samply returned empty path"));
127        }
128
129        Ok(samply_path)
130    }
131
132    /// Build reth arguments as a vector of strings
133    fn build_reth_args(
134        &self,
135        binary_path_str: &str,
136        additional_args: &[String],
137        ref_type: &str,
138    ) -> (Vec<String>, String) {
139        let mut reth_args = vec![binary_path_str.to_string(), "node".to_string()];
140
141        // Add chain argument (skip for mainnet as it's the default)
142        let chain_str = self.chain.to_string();
143        if chain_str != "mainnet" {
144            reth_args.extend_from_slice(&["--chain".to_string(), chain_str.clone()]);
145        }
146
147        // Add datadir if specified
148        if let Some(ref datadir) = self.datadir {
149            reth_args.extend_from_slice(&["--datadir".to_string(), datadir.clone()]);
150        }
151
152        // Add reth-specific arguments
153        let metrics_arg = format!("0.0.0.0:{}", self.metrics_port);
154        reth_args.extend_from_slice(&[
155            "--engine.accept-execution-requests-hash".to_string(),
156            "--metrics".to_string(),
157            metrics_arg,
158            "--http".to_string(),
159            "--http.api".to_string(),
160            "eth,reth".to_string(),
161            "--ws".to_string(),
162            "--ws.api".to_string(),
163            "eth,reth".to_string(),
164            "--disable-discovery".to_string(),
165            "--trusted-only".to_string(),
166            "--disable-tx-gossip".to_string(),
167        ]);
168
169        // Add tracing arguments if OTLP endpoint is configured
170        if let Some(ref endpoint) = self.tracing_endpoint {
171            info!("Enabling OTLP tracing export to: {} (service: reth-{})", endpoint, ref_type);
172            // Endpoint requires equals per clap settings in reth
173            reth_args.push(format!("--tracing-otlp={}", endpoint));
174        }
175
176        // Add any additional arguments passed via command line (common to both baseline and
177        // feature)
178        reth_args.extend_from_slice(&self.additional_reth_args);
179
180        // Add reference-specific additional arguments
181        reth_args.extend_from_slice(additional_args);
182
183        (reth_args, chain_str)
184    }
185
186    /// Create a command for profiling mode
187    async fn create_profiling_command(
188        &self,
189        ref_type: &str,
190        reth_args: &[String],
191    ) -> Result<Command> {
192        // Create profiles directory if it doesn't exist
193        let profile_dir = self.output_dir.join("profiles");
194        fs::create_dir_all(&profile_dir).wrap_err("Failed to create profiles directory")?;
195
196        let profile_path = profile_dir.join(format!("{}.json.gz", ref_type));
197        info!("Starting reth node with samply profiling...");
198        info!("Profile output: {:?}", profile_path);
199
200        // Get absolute path to samply
201        let samply_path = self.get_samply_path().await?;
202
203        let mut cmd = if self.use_sudo {
204            let mut sudo_cmd = Command::new("sudo");
205            sudo_cmd.arg(&samply_path);
206            sudo_cmd
207        } else {
208            Command::new(&samply_path)
209        };
210
211        // Add samply arguments
212        cmd.args(["record", "--save-only", "-o", &profile_path.to_string_lossy()]);
213
214        // Add rate argument if available
215        if let Some(rate) = self.get_perf_sample_rate() {
216            cmd.args(["--rate", &rate]);
217        }
218
219        // Add separator and complete reth command
220        cmd.arg("--");
221        cmd.args(reth_args);
222
223        // Enable tracing-samply
224        if supports_samply_flags(&reth_args[0]) {
225            cmd.arg("--log.samply");
226        }
227
228        // Set environment variable to disable log styling
229        cmd.env("RUST_LOG_STYLE", "never");
230
231        Ok(cmd)
232    }
233
234    /// Create a command for direct reth execution
235    fn create_direct_command(&self, reth_args: &[String]) -> Command {
236        let binary_path = &reth_args[0];
237
238        let mut cmd = if self.use_sudo {
239            info!("Starting reth node with sudo...");
240            let mut sudo_cmd = Command::new("sudo");
241            sudo_cmd.args(reth_args);
242            sudo_cmd
243        } else {
244            info!("Starting reth node...");
245            let mut reth_cmd = Command::new(binary_path);
246            reth_cmd.args(&reth_args[1..]); // Skip the binary path since it's the command
247            reth_cmd
248        };
249
250        // Set environment variable to disable log styling
251        cmd.env("RUST_LOG_STYLE", "never");
252
253        cmd
254    }
255
256    /// Start a reth node using the specified binary path and return the process handle
257    /// along with the formatted reth command string for reporting.
258    pub(crate) async fn start_node(
259        &mut self,
260        binary_path: &std::path::Path,
261        _git_ref: &str,
262        ref_type: &str,
263        additional_args: &[String],
264    ) -> Result<(tokio::process::Child, String)> {
265        // Store the binary path for later use (e.g., in unwind_to_block)
266        self.binary_path = Some(binary_path.to_path_buf());
267
268        let binary_path_str = binary_path.to_string_lossy();
269        let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
270
271        // Format the reth command string for reporting
272        let reth_command = shlex::try_join(reth_args.iter().map(|s| s.as_str()))
273            .wrap_err("Failed to format reth command string")?;
274
275        // Log additional arguments if any
276        if !self.additional_reth_args.is_empty() {
277            info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
278        }
279        if !additional_args.is_empty() {
280            info!("Using reference-specific additional reth arguments: {:?}", additional_args);
281        }
282
283        let mut cmd = if self.enable_profiling {
284            self.create_profiling_command(ref_type, &reth_args).await?
285        } else {
286            self.create_direct_command(&reth_args)
287        };
288
289        // Set process group for better signal handling
290        #[cfg(unix)]
291        {
292            cmd.process_group(0);
293        }
294
295        // Set high queue size to prevent trace dropping during benchmarks
296        if self.tracing_endpoint.is_some() {
297            cmd.env("OTEL_BSP_MAX_QUEUE_SIZE", self.otlp_max_queue_size.to_string()); // Traces
298            cmd.env("OTEL_BLRP_MAX_QUEUE_SIZE", "10000"); // Logs
299
300            // Set service name to differentiate baseline vs feature runs in Jaeger
301            cmd.env("OTEL_SERVICE_NAME", format!("reth-{}", ref_type));
302        }
303
304        debug!("Executing reth command: {cmd:?}");
305
306        let mut child = cmd
307            .stdout(std::process::Stdio::piped())
308            .stderr(std::process::Stdio::piped())
309            .kill_on_drop(true) // Kill on drop so that on Ctrl-C for parent process we stop all child processes
310            .spawn()
311            .wrap_err("Failed to start reth node")?;
312
313        info!(
314            "Reth node started with PID: {:?} (binary: {})",
315            child.id().ok_or_eyre("Reth node is not running")?,
316            binary_path_str
317        );
318
319        // Prepare log file path
320        let log_file_path = self.get_log_file_path(ref_type)?;
321        info!("Reth node logs will be saved to: {:?}", log_file_path);
322
323        // Stream stdout and stderr with prefixes at debug level and to log file
324        if let Some(stdout) = child.stdout.take() {
325            let log_file = AsyncFile::create(&log_file_path)
326                .await
327                .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
328            tokio::spawn(async move {
329                let reader = AsyncBufReader::new(stdout);
330                let mut lines = reader.lines();
331                let mut log_file = log_file;
332                while let Ok(Some(line)) = lines.next_line().await {
333                    debug!("[RETH] {}", line);
334                    // Write to log file (reth already includes timestamps)
335                    let log_line = format!("{}\n", line);
336                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
337                        debug!("Failed to write to log file: {}", e);
338                    }
339                }
340            });
341        }
342
343        if let Some(stderr) = child.stderr.take() {
344            let log_file = AsyncFile::options()
345                .create(true)
346                .append(true)
347                .open(&log_file_path)
348                .await
349                .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
350            tokio::spawn(async move {
351                let reader = AsyncBufReader::new(stderr);
352                let mut lines = reader.lines();
353                let mut log_file = log_file;
354                while let Ok(Some(line)) = lines.next_line().await {
355                    debug!("[RETH] {}", line);
356                    // Write to log file (reth already includes timestamps)
357                    let log_line = format!("{}\n", line);
358                    if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
359                        debug!("Failed to write to log file: {}", e);
360                    }
361                }
362            });
363        }
364
365        // Give the node a moment to start up
366        sleep(Duration::from_secs(5)).await;
367
368        Ok((child, reth_command))
369    }
370
371    /// Wait for the node to be ready and return its current tip.
372    ///
373    /// Fails early if the node process exits before becoming ready.
374    pub(crate) async fn wait_for_node_ready_and_get_tip(
375        &self,
376        child: &mut tokio::process::Child,
377    ) -> Result<u64> {
378        info!("Waiting for node to be ready and synced...");
379
380        let max_wait = Duration::from_secs(120); // 2 minutes to allow for sync
381        let check_interval = Duration::from_secs(2);
382        let rpc_url = "http://localhost:8545";
383
384        // Create Alloy provider
385        let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
386        let provider = ProviderBuilder::new().connect_http(url);
387
388        let start_time = tokio::time::Instant::now();
389        let mut iteration = 0;
390
391        timeout(max_wait, async {
392            loop {
393                iteration += 1;
394                debug!(
395                    "Readiness check iteration {} (elapsed: {:?})",
396                    iteration,
397                    start_time.elapsed()
398                );
399
400                // Check if the node process has exited.
401                if let Some(status) = child.try_wait()? {
402                    return Err(eyre!("Node process exited unexpectedly with {status}"));
403                }
404
405                // First check if RPC is up and node is not syncing
406                match provider.syncing().await {
407                    Ok(sync_result) => {
408                        match sync_result {
409                            SyncStatus::Info(sync_info) => {
410                                debug!("Node is still syncing {sync_info:?}, waiting...");
411                            }
412                            _ => {
413                                debug!("HTTP RPC is up and node is not syncing, checking block number...");
414                                // Node is not syncing, now get the tip
415                                match provider.get_block_number().await {
416                                    Ok(tip) => {
417                                        debug!("HTTP RPC ready at block: {}, checking WebSocket...", tip);
418
419                                        // Verify WebSocket RPC is ready (public endpoint, no JWT required)
420                                        let ws_url = format!("ws://localhost:{}", DEFAULT_WS_RPC_PORT);
421                                        debug!("Attempting WebSocket connection to {} (public endpoint)", ws_url);
422                                        let ws_connect = WsConnect::new(&ws_url);
423
424                                        match RpcClient::connect_pubsub(ws_connect).await
425                                        {
426                                            Ok(_) => {
427                                                info!(
428                                                    "Node is ready (HTTP and WebSocket) at block: {} (took {:?}, {} iterations)",
429                                                    tip, start_time.elapsed(), iteration
430                                                );
431                                                return Ok(tip);
432                                            }
433                                            Err(e) => {
434                                                debug!(
435                                                    "HTTP RPC ready but WebSocket not ready yet (iteration {}): {:?}",
436                                                    iteration, e
437                                                );
438                                                debug!("WebSocket error details: {}", e);
439                                            }
440                                        }
441                                    }
442                                    Err(e) => {
443                                        debug!("Failed to get block number (iteration {}): {:?}", iteration, e);
444                                    }
445                                }
446                            }
447                        }
448                    }
449                    Err(e) => {
450                        debug!("Node RPC not ready yet or failed to check sync status (iteration {}): {:?}", iteration, e);
451                    }
452                }
453
454                debug!("Sleeping for {:?} before next check", check_interval);
455                sleep(check_interval).await;
456            }
457        })
458        .await
459        .wrap_err("Timed out waiting for node to be ready and synced")?
460    }
461
462    /// Wait for the node RPC to be ready and return its current tip, without waiting for sync.
463    ///
464    /// This is faster than `wait_for_node_ready_and_get_tip` but may return a tip while
465    /// the node is still syncing.
466    pub(crate) async fn wait_for_rpc_and_get_tip(
467        &self,
468        child: &mut tokio::process::Child,
469    ) -> Result<u64> {
470        info!("Waiting for node RPC to be ready (skipping sync wait)...");
471
472        let max_wait = Duration::from_secs(60);
473        let check_interval = Duration::from_secs(2);
474        let rpc_url = "http://localhost:8545";
475
476        let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
477        let provider = ProviderBuilder::new().connect_http(url);
478
479        let start_time = tokio::time::Instant::now();
480        let mut iteration = 0;
481
482        timeout(max_wait, async {
483            loop {
484                iteration += 1;
485                debug!(
486                    "RPC readiness check iteration {} (elapsed: {:?})",
487                    iteration,
488                    start_time.elapsed()
489                );
490
491                if let Some(status) = child.try_wait()? {
492                    return Err(eyre!("Node process exited unexpectedly with {status}"));
493                }
494
495                match provider.get_block_number().await {
496                    Ok(tip) => {
497                        debug!("HTTP RPC ready at block: {}, checking WebSocket...", tip);
498
499                        let ws_url = format!("ws://localhost:{}", DEFAULT_WS_RPC_PORT);
500                        let ws_connect = WsConnect::new(&ws_url);
501
502                        match RpcClient::connect_pubsub(ws_connect).await {
503                            Ok(_) => {
504                                info!(
505                                    "Node RPC is ready at block: {} (took {:?}, {} iterations)",
506                                    tip,
507                                    start_time.elapsed(),
508                                    iteration
509                                );
510                                return Ok(tip);
511                            }
512                            Err(e) => {
513                                debug!(
514                                    "HTTP RPC ready but WebSocket not ready yet (iteration {}): {:?}",
515                                    iteration, e
516                                );
517                            }
518                        }
519                    }
520                    Err(e) => {
521                        debug!("RPC not ready yet (iteration {}): {:?}", iteration, e);
522                    }
523                }
524
525                sleep(check_interval).await;
526            }
527        })
528        .await
529        .wrap_err("Timed out waiting for node RPC to be ready")?
530    }
531
532    /// Stop the reth node gracefully
533    pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
534        let pid = child.id().ok_or_eyre("Child process ID should be available")?;
535
536        // Check if the process has already exited
537        match child.try_wait() {
538            Ok(Some(status)) => {
539                info!("Reth node (PID: {}) has already exited with status: {:?}", pid, status);
540                return Ok(());
541            }
542            Ok(None) => {
543                // Process is still running, proceed to stop it
544                info!("Stopping process gracefully with SIGINT (PID: {})...", pid);
545            }
546            Err(e) => {
547                return Err(eyre!("Failed to check process status: {}", e));
548            }
549        }
550
551        #[cfg(unix)]
552        {
553            // Send SIGINT to process group to mimic Ctrl-C behavior
554            let nix_pgid = Pid::from_raw(pid as i32);
555
556            match killpg(nix_pgid, Signal::SIGINT) {
557                Ok(()) => {}
558                Err(nix::errno::Errno::ESRCH) => {
559                    info!("Process group {} has already exited", pid);
560                }
561                Err(e) => {
562                    return Err(eyre!("Failed to send SIGINT to process group {}: {}", pid, e));
563                }
564            }
565        }
566
567        #[cfg(not(unix))]
568        {
569            // On non-Unix systems, fall back to using external kill command
570            let output = Command::new("taskkill")
571                .args(["/PID", &pid.to_string(), "/F"])
572                .output()
573                .await
574                .wrap_err("Failed to execute taskkill command")?;
575
576            if !output.status.success() {
577                let stderr = String::from_utf8_lossy(&output.stderr);
578                // Check if the error is because the process doesn't exist
579                if stderr.contains("not found") || stderr.contains("not exist") {
580                    info!("Process {} has already exited", pid);
581                } else {
582                    return Err(eyre!("Failed to kill process {}: {}", pid, stderr));
583                }
584            }
585        }
586
587        // Wait for the process to exit
588        match child.wait().await {
589            Ok(status) => {
590                info!("Reth node (PID: {}) exited with status: {:?}", pid, status);
591            }
592            Err(e) => {
593                // If we get an error here, it might be because the process already exited
594                debug!("Error waiting for process exit (may have already exited): {}", e);
595            }
596        }
597
598        Ok(())
599    }
600
601    /// Unwind the node to a specific block
602    pub(crate) async fn unwind_to_block(&self, block_number: u64) -> Result<()> {
603        if self.use_sudo {
604            info!("Unwinding node to block: {} (with sudo)", block_number);
605        } else {
606            info!("Unwinding node to block: {}", block_number);
607        }
608
609        // Use the binary path from the last start_node call, or fallback to default
610        let binary_path = self
611            .binary_path
612            .as_ref()
613            .map(|p| p.to_string_lossy().to_string())
614            .unwrap_or_else(|| "./target/profiling/reth".to_string());
615
616        let mut cmd = if self.use_sudo {
617            let mut sudo_cmd = Command::new("sudo");
618            sudo_cmd.args([&binary_path, "stage", "unwind"]);
619            sudo_cmd
620        } else {
621            let mut reth_cmd = Command::new(&binary_path);
622            reth_cmd.args(["stage", "unwind"]);
623            reth_cmd
624        };
625
626        // Add chain argument (skip for mainnet as it's the default)
627        let chain_str = self.chain.to_string();
628        if chain_str != "mainnet" {
629            cmd.args(["--chain", &chain_str]);
630        }
631
632        // Add datadir if specified
633        if let Some(ref datadir) = self.datadir {
634            cmd.args(["--datadir", datadir]);
635        }
636
637        cmd.args(["to-block", &block_number.to_string()]);
638
639        // Set environment variable to disable log styling
640        cmd.env("RUST_LOG_STYLE", "never");
641
642        // Debug log the command
643        debug!("Executing reth unwind command: {:?}", cmd);
644
645        let mut child = cmd
646            .stdout(std::process::Stdio::piped())
647            .stderr(std::process::Stdio::piped())
648            .spawn()
649            .wrap_err("Failed to start unwind command")?;
650
651        // Stream stdout and stderr with prefixes in real-time
652        if let Some(stdout) = child.stdout.take() {
653            tokio::spawn(async move {
654                let reader = AsyncBufReader::new(stdout);
655                let mut lines = reader.lines();
656                while let Ok(Some(line)) = lines.next_line().await {
657                    debug!("[RETH-UNWIND] {}", line);
658                }
659            });
660        }
661
662        if let Some(stderr) = child.stderr.take() {
663            tokio::spawn(async move {
664                let reader = AsyncBufReader::new(stderr);
665                let mut lines = reader.lines();
666                while let Ok(Some(line)) = lines.next_line().await {
667                    debug!("[RETH-UNWIND] {}", line);
668                }
669            });
670        }
671
672        // Wait for the command to complete
673        let status = child.wait().await.wrap_err("Failed to wait for unwind command")?;
674
675        if !status.success() {
676            return Err(eyre!("Unwind command failed with exit code: {:?}", status.code()));
677        }
678
679        info!("Unwound to block: {}", block_number);
680        Ok(())
681    }
682}
683
684fn supports_samply_flags(bin: &str) -> bool {
685    let mut cmd = std::process::Command::new(bin);
686    // NOTE: The flag to check must come before --help.
687    // We pass --help as a shortcut to not execute any command.
688    cmd.args(["--log.samply", "--help"]);
689    debug!(?cmd, "Checking samply flags support");
690    let Ok(output) = cmd.output() else {
691        return false;
692    };
693    debug!(?output, "Samply flags support check");
694    output.status.success()
695}