reth_bench_compare/
cli.rs

1//! CLI argument parsing and main command orchestration.
2
3use alloy_provider::{Provider, ProviderBuilder};
4use clap::Parser;
5use eyre::{eyre, Result, WrapErr};
6use reth_chainspec::Chain;
7use reth_cli_runner::CliContext;
8use reth_node_core::args::{DatadirArgs, LogArgs, TraceArgs};
9use reth_tracing::FileWorkerGuard;
10use std::{net::TcpListener, path::PathBuf, str::FromStr};
11use tokio::process::Command;
12use tracing::{debug, info, warn};
13
14use crate::{
15    benchmark::BenchmarkRunner, comparison::ComparisonGenerator, compilation::CompilationManager,
16    git::GitManager, node::NodeManager,
17};
18
19/// Target for disabling the --debug.startup-sync-state-idle flag
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub(crate) enum DisableStartupSyncStateIdle {
22    /// Disable for baseline and warmup runs
23    Baseline,
24    /// Disable for feature runs only
25    Feature,
26    /// Disable for all runs
27    All,
28}
29
30impl FromStr for DisableStartupSyncStateIdle {
31    type Err = String;
32
33    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
34        match s.to_lowercase().as_str() {
35            "baseline" => Ok(Self::Baseline),
36            "feature" => Ok(Self::Feature),
37            "all" => Ok(Self::All),
38            _ => Err(format!("Invalid value '{}'. Expected 'baseline', 'feature', or 'all'", s)),
39        }
40    }
41}
42
43impl std::fmt::Display for DisableStartupSyncStateIdle {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Baseline => write!(f, "baseline"),
47            Self::Feature => write!(f, "feature"),
48            Self::All => write!(f, "all"),
49        }
50    }
51}
52
53/// Automated reth benchmark comparison between git references
54#[derive(Debug, Parser)]
55#[command(
56    name = "reth-bench-compare",
57    about = "Compare reth performance between two git references (branches or tags)",
58    version
59)]
60pub(crate) struct Args {
61    /// Git reference (branch or tag) to use as baseline for comparison
62    #[arg(long, value_name = "REF")]
63    pub baseline_ref: String,
64
65    /// Git reference (branch or tag) to compare against the baseline
66    #[arg(long, value_name = "REF")]
67    pub feature_ref: String,
68
69    #[command(flatten)]
70    pub datadir: DatadirArgs,
71
72    /// Number of blocks to benchmark
73    #[arg(long, value_name = "N", default_value = "100")]
74    pub blocks: u64,
75
76    /// RPC endpoint for fetching block data
77    #[arg(long, value_name = "URL")]
78    pub rpc_url: Option<String>,
79
80    /// JWT secret file path
81    ///
82    /// If not provided, defaults to `<datadir>/<chain>/jwt.hex`.
83    /// If the file doesn't exist, it will be created automatically.
84    #[arg(long, value_name = "PATH")]
85    pub jwt_secret: Option<PathBuf>,
86
87    /// Output directory for benchmark results
88    #[arg(long, value_name = "PATH", default_value = "./reth-bench-compare")]
89    pub output_dir: String,
90
91    /// Skip git branch validation (useful for testing)
92    #[arg(long)]
93    pub skip_git_validation: bool,
94
95    /// Port for reth metrics endpoint
96    #[arg(long, value_name = "PORT", default_value = "5005")]
97    pub metrics_port: u16,
98
99    /// The chain this node is running.
100    ///
101    /// Possible values are either a built-in chain name or numeric chain ID.
102    #[arg(long, value_name = "CHAIN", default_value = "mainnet", required = false)]
103    pub chain: Chain,
104
105    /// Run reth binary with sudo (for elevated privileges)
106    #[arg(long)]
107    pub sudo: bool,
108
109    /// Generate comparison charts using Python script
110    #[arg(long)]
111    pub draw: bool,
112
113    /// Enable CPU profiling with samply during benchmark runs
114    #[arg(long)]
115    pub profile: bool,
116
117    /// Wait time between engine API calls (passed to reth-bench)
118    #[arg(long, value_name = "DURATION")]
119    pub wait_time: Option<String>,
120
121    /// Number of blocks to run for cache warmup after clearing caches.
122    /// If not specified, defaults to the same as --blocks
123    #[arg(long, value_name = "N")]
124    pub warmup_blocks: Option<u64>,
125
126    /// Disable filesystem cache clearing before warmup phase.
127    /// By default, filesystem caches are cleared before warmup to ensure consistent benchmarks.
128    #[arg(long)]
129    pub no_clear_cache: bool,
130
131    #[command(flatten)]
132    pub logs: LogArgs,
133
134    #[command(flatten)]
135    pub traces: TraceArgs,
136
137    /// Maximum queue size for OTLP Batch Span Processor (traces).
138    /// Higher values prevent trace drops when benchmarking many blocks.
139    #[arg(
140        long,
141        value_name = "OTLP_BUFFER_SIZE",
142        default_value = "32768",
143        help_heading = "Tracing"
144    )]
145    pub otlp_max_queue_size: usize,
146
147    /// Additional arguments to pass to baseline reth node command
148    ///
149    /// Example: `--baseline-args "--debug.tip 0xabc..."`
150    #[arg(long, value_name = "ARGS")]
151    pub baseline_args: Option<String>,
152
153    /// Additional arguments to pass to feature reth node command
154    ///
155    /// Example: `--feature-args "--debug.tip 0xdef..."`
156    #[arg(long, value_name = "ARGS")]
157    pub feature_args: Option<String>,
158
159    /// Additional arguments to pass to reth node command (applied to both baseline and feature)
160    ///
161    /// All arguments after `--` will be passed directly to the reth node command.
162    /// Example: `reth-bench-compare --baseline-ref main --feature-ref pr/123 -- --debug.tip
163    /// 0xabc...`
164    #[arg(trailing_var_arg = true, allow_hyphen_values = true)]
165    pub reth_args: Vec<String>,
166
167    /// Comma-separated list of features to enable during reth compilation
168    ///
169    /// Example: `jemalloc,asm-keccak`
170    #[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
171    pub features: String,
172
173    /// Disable automatic --debug.startup-sync-state-idle flag for specific runs.
174    /// Can be "baseline", "feature", or "all".
175    /// By default, the flag is passed to warmup, baseline, and feature runs.
176    /// When "baseline" is specified, the flag is NOT passed to warmup OR baseline.
177    /// When "feature" is specified, the flag is NOT passed to feature.
178    /// When "all" is specified, the flag is NOT passed to any run.
179    #[arg(long, value_name = "TARGET")]
180    pub disable_startup_sync_state_idle: Option<DisableStartupSyncStateIdle>,
181}
182
183impl Args {
184    /// Initializes tracing with the configured options.
185    pub(crate) fn init_tracing(&self) -> Result<Option<FileWorkerGuard>> {
186        let guard = self.logs.init_tracing()?;
187        Ok(guard)
188    }
189
190    /// Build additional arguments for a specific ref type, conditionally including
191    /// --debug.startup-sync-state-idle based on the configuration
192    pub(crate) fn build_additional_args(
193        &self,
194        ref_type: &str,
195        base_args_str: Option<&String>,
196    ) -> Vec<String> {
197        // Parse the base arguments string if provided
198        let mut args = base_args_str.map(|s| parse_args_string(s)).unwrap_or_default();
199
200        // Determine if we should add the --debug.startup-sync-state-idle flag
201        let should_add_flag = match self.disable_startup_sync_state_idle {
202            None => true, // By default, add the flag
203            Some(DisableStartupSyncStateIdle::All) => false,
204            Some(DisableStartupSyncStateIdle::Baseline) => {
205                ref_type != "baseline" && ref_type != "warmup"
206            }
207            Some(DisableStartupSyncStateIdle::Feature) => ref_type != "feature",
208        };
209
210        if should_add_flag {
211            args.push("--debug.startup-sync-state-idle".to_string());
212            debug!("Adding --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
213        } else {
214            debug!("Skipping --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
215        }
216
217        args
218    }
219
220    /// Get the default RPC URL for a given chain
221    const fn get_default_rpc_url(chain: &Chain) -> &'static str {
222        match chain.id() {
223            8453 => "https://base-mainnet.rpc.ithaca.xyz",  // base
224            84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
225            27082 => "https://rpc.hoodi.ethpandaops.io",    // hoodi
226            _ => "https://reth-ethereum.ithaca.xyz/rpc",    // mainnet and fallback
227        }
228    }
229
230    /// Get the RPC URL, using chain-specific default if not provided
231    pub(crate) fn get_rpc_url(&self) -> String {
232        self.rpc_url.clone().unwrap_or_else(|| Self::get_default_rpc_url(&self.chain).to_string())
233    }
234
235    /// Get the JWT secret path - either provided or derived from datadir
236    pub(crate) fn jwt_secret_path(&self) -> PathBuf {
237        match &self.jwt_secret {
238            Some(path) => {
239                let jwt_secret_str = path.to_string_lossy();
240                let expanded = shellexpand::tilde(&jwt_secret_str);
241                PathBuf::from(expanded.as_ref())
242            }
243            None => {
244                // Use the same logic as reth: <datadir>/<chain>/jwt.hex
245                let chain_path = self.datadir.clone().resolve_datadir(self.chain);
246                chain_path.jwt()
247            }
248        }
249    }
250
251    /// Get the resolved datadir path using the chain
252    pub(crate) fn datadir_path(&self) -> PathBuf {
253        let chain_path = self.datadir.clone().resolve_datadir(self.chain);
254        chain_path.data_dir().to_path_buf()
255    }
256
257    /// Get the expanded output directory path
258    pub(crate) fn output_dir_path(&self) -> PathBuf {
259        let expanded = shellexpand::tilde(&self.output_dir);
260        PathBuf::from(expanded.as_ref())
261    }
262
263    /// Get the effective warmup blocks value - either specified or defaults to blocks
264    pub(crate) fn get_warmup_blocks(&self) -> u64 {
265        self.warmup_blocks.unwrap_or(self.blocks)
266    }
267}
268
269/// Validate that the RPC endpoint chain ID matches the specified chain
270async fn validate_rpc_chain_id(rpc_url: &str, expected_chain: &Chain) -> Result<()> {
271    // Create Alloy provider
272    let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
273    let provider = ProviderBuilder::new().connect_http(url);
274
275    // Query chain ID using Alloy
276    let rpc_chain_id = provider
277        .get_chain_id()
278        .await
279        .map_err(|e| eyre!("Failed to get chain ID from RPC endpoint {}: {:?}", rpc_url, e))?;
280
281    let expected_chain_id = expected_chain.id();
282
283    if rpc_chain_id != expected_chain_id {
284        return Err(eyre!(
285            "RPC endpoint chain ID mismatch!\n\
286            Expected: {} (chain: {})\n\
287            Found: {} at RPC endpoint: {}\n\n\
288            Please use an RPC endpoint for the correct network or change the --chain argument.",
289            expected_chain_id,
290            expected_chain,
291            rpc_chain_id,
292            rpc_url
293        ));
294    }
295
296    info!("Validated RPC endpoint chain ID");
297    Ok(())
298}
299
300/// Main comparison workflow execution
301pub(crate) async fn run_comparison(args: Args, _ctx: CliContext) -> Result<()> {
302    // Create a new process group for this process and all its children
303    #[cfg(unix)]
304    {
305        use nix::unistd::{getpid, setpgid};
306        if let Err(e) = setpgid(getpid(), getpid()) {
307            warn!("Failed to create process group: {e}");
308        }
309    }
310
311    info!(
312        "Starting benchmark comparison between '{}' and '{}'",
313        args.baseline_ref, args.feature_ref
314    );
315
316    if args.sudo {
317        info!("Running in sudo mode - reth commands will use elevated privileges");
318    }
319
320    // Initialize Git manager
321    let git_manager = GitManager::new()?;
322    // Fetch all branches, tags, and commits
323    git_manager.fetch_all()?;
324
325    // Initialize compilation manager
326    let output_dir = args.output_dir_path();
327    let compilation_manager = CompilationManager::new(
328        git_manager.repo_root().to_string(),
329        output_dir.clone(),
330        git_manager.clone(),
331        args.features.clone(),
332    )?;
333    // Initialize node manager
334    let mut node_manager = NodeManager::new(&args);
335
336    let benchmark_runner = BenchmarkRunner::new(&args);
337    let mut comparison_generator = ComparisonGenerator::new(&args);
338
339    // Set the comparison directory in node manager to align with results directory
340    node_manager.set_comparison_dir(comparison_generator.get_output_dir());
341
342    // Store original git state for restoration
343    let original_ref = git_manager.get_current_ref()?;
344    info!("Current git reference: {}", original_ref);
345
346    // Validate git state
347    if !args.skip_git_validation {
348        git_manager.validate_clean_state()?;
349        git_manager.validate_refs(&[&args.baseline_ref, &args.feature_ref])?;
350    }
351
352    // Validate RPC endpoint chain ID matches the specified chain
353    let rpc_url = args.get_rpc_url();
354    validate_rpc_chain_id(&rpc_url, &args.chain).await?;
355
356    // Setup signal handling for cleanup
357    let git_manager_cleanup = git_manager.clone();
358    let original_ref_cleanup = original_ref.clone();
359    ctrlc::set_handler(move || {
360        eprintln!("Received interrupt signal, cleaning up...");
361
362        // Send SIGTERM to entire process group to ensure all children exit
363        #[cfg(unix)]
364        {
365            use nix::{
366                sys::signal::{kill, Signal},
367                unistd::Pid,
368            };
369
370            // Send SIGTERM to our process group (negative PID = process group)
371            let current_pid = std::process::id() as i32;
372            let pgid = Pid::from_raw(-current_pid);
373            if let Err(e) = kill(pgid, Signal::SIGTERM) {
374                eprintln!("Failed to send SIGTERM to process group: {e}");
375            }
376        }
377
378        // Give a moment for any ongoing git operations to complete
379        std::thread::sleep(std::time::Duration::from_millis(200));
380
381        if let Err(e) = git_manager_cleanup.switch_ref(&original_ref_cleanup) {
382            eprintln!("Failed to restore original git reference: {e}");
383            eprintln!("You may need to manually run: git checkout {original_ref_cleanup}");
384        }
385        std::process::exit(1);
386    })?;
387
388    let result = run_benchmark_workflow(
389        &git_manager,
390        &compilation_manager,
391        &mut node_manager,
392        &benchmark_runner,
393        &mut comparison_generator,
394        &args,
395    )
396    .await;
397
398    // Always restore original git reference
399    info!("Restoring original git reference: {}", original_ref);
400    git_manager.switch_ref(&original_ref)?;
401
402    // Handle any errors from the workflow
403    result?;
404
405    Ok(())
406}
407
408/// Parse a string of arguments into a vector of strings
409fn parse_args_string(args_str: &str) -> Vec<String> {
410    shlex::split(args_str).unwrap_or_else(|| {
411        // Fallback to simple whitespace splitting if shlex fails
412        args_str.split_whitespace().map(|s| s.to_string()).collect()
413    })
414}
415
416/// Run compilation phase for both baseline and feature binaries
417async fn run_compilation_phase(
418    git_manager: &GitManager,
419    compilation_manager: &CompilationManager,
420    args: &Args,
421    is_optimism: bool,
422) -> Result<(String, String)> {
423    info!("=== Running compilation phase ===");
424
425    // Ensure required tools are available (only need to check once)
426    compilation_manager.ensure_reth_bench_available()?;
427    if args.profile {
428        compilation_manager.ensure_samply_available()?;
429    }
430
431    let refs = [&args.baseline_ref, &args.feature_ref];
432    let ref_types = ["baseline", "feature"];
433
434    // First, resolve all refs to commits using a HashMap to avoid race conditions where a ref is
435    // pushed to mid-run.
436    let mut ref_commits = std::collections::HashMap::new();
437    for &git_ref in &refs {
438        if !ref_commits.contains_key(git_ref) {
439            git_manager.switch_ref(git_ref)?;
440            let commit = git_manager.get_current_commit()?;
441            ref_commits.insert(git_ref.clone(), commit);
442            info!("Reference {} resolves to commit: {}", git_ref, &ref_commits[git_ref][..8]);
443        }
444    }
445
446    // Now compile each ref using the resolved commits
447    for (i, &git_ref) in refs.iter().enumerate() {
448        let ref_type = ref_types[i];
449        let commit = &ref_commits[git_ref];
450
451        info!(
452            "Compiling {} binary for reference: {} (commit: {})",
453            ref_type,
454            git_ref,
455            &commit[..8]
456        );
457
458        // Switch to target reference
459        git_manager.switch_ref(git_ref)?;
460
461        // Compile reth (with caching)
462        compilation_manager.compile_reth(commit, is_optimism)?;
463
464        info!("Completed compilation for {} reference", ref_type);
465    }
466
467    let baseline_commit = ref_commits[&args.baseline_ref].clone();
468    let feature_commit = ref_commits[&args.feature_ref].clone();
469
470    info!("Compilation phase completed");
471    Ok((baseline_commit, feature_commit))
472}
473
474/// Run warmup phase to warm up caches before benchmarking
475async fn run_warmup_phase(
476    git_manager: &GitManager,
477    compilation_manager: &CompilationManager,
478    node_manager: &mut NodeManager,
479    benchmark_runner: &BenchmarkRunner,
480    args: &Args,
481    is_optimism: bool,
482    baseline_commit: &str,
483) -> Result<()> {
484    info!("=== Running warmup phase ===");
485
486    // Use baseline for warmup
487    let warmup_ref = &args.baseline_ref;
488
489    // Switch to baseline reference
490    git_manager.switch_ref(warmup_ref)?;
491
492    // Get the cached binary path for baseline (should already be compiled)
493    let binary_path =
494        compilation_manager.get_cached_binary_path_for_commit(baseline_commit, is_optimism);
495
496    // Verify the cached binary exists
497    if !binary_path.exists() {
498        return Err(eyre!(
499            "Cached baseline binary not found at {:?}. Compilation phase should have created it.",
500            binary_path
501        ));
502    }
503
504    info!("Using cached baseline binary for warmup (commit: {})", &baseline_commit[..8]);
505
506    // Build additional args with conditional --debug.startup-sync-state-idle flag
507    let additional_args = args.build_additional_args("warmup", args.baseline_args.as_ref());
508
509    // Start reth node for warmup
510    let mut node_process =
511        node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
512
513    // Wait for node to be ready and get its current tip
514    let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
515    info!("Warmup node is ready at tip: {}", current_tip);
516
517    // Store the tip we'll unwind back to
518    let original_tip = current_tip;
519
520    // Clear filesystem caches before warmup run only (unless disabled)
521    if args.no_clear_cache {
522        info!("Skipping filesystem cache clearing (--no-clear-cache flag set)");
523    } else {
524        BenchmarkRunner::clear_fs_caches().await?;
525    }
526
527    // Run warmup to warm up caches
528    benchmark_runner.run_warmup(current_tip).await?;
529
530    // Stop node before unwinding (node must be stopped to release database lock)
531    node_manager.stop_node(&mut node_process).await?;
532
533    // Unwind back to starting block after warmup
534    node_manager.unwind_to_block(original_tip).await?;
535
536    info!("Warmup phase completed");
537    Ok(())
538}
539
540/// Execute the complete benchmark workflow for both branches
541async fn run_benchmark_workflow(
542    git_manager: &GitManager,
543    compilation_manager: &CompilationManager,
544    node_manager: &mut NodeManager,
545    benchmark_runner: &BenchmarkRunner,
546    comparison_generator: &mut ComparisonGenerator,
547    args: &Args,
548) -> Result<()> {
549    // Detect if this is an Optimism chain once at the beginning
550    let rpc_url = args.get_rpc_url();
551    let is_optimism = compilation_manager.detect_optimism_chain(&rpc_url).await?;
552
553    // Run compilation phase for both binaries
554    let (baseline_commit, feature_commit) =
555        run_compilation_phase(git_manager, compilation_manager, args, is_optimism).await?;
556
557    // Run warmup phase before benchmarking (skip if warmup_blocks is 0)
558    if args.get_warmup_blocks() > 0 {
559        run_warmup_phase(
560            git_manager,
561            compilation_manager,
562            node_manager,
563            benchmark_runner,
564            args,
565            is_optimism,
566            &baseline_commit,
567        )
568        .await?;
569    } else {
570        info!("Skipping warmup phase (warmup_blocks is 0)");
571    }
572
573    let refs = [&args.baseline_ref, &args.feature_ref];
574    let ref_types = ["baseline", "feature"];
575    let commits = [&baseline_commit, &feature_commit];
576
577    for (i, &git_ref) in refs.iter().enumerate() {
578        let ref_type = ref_types[i];
579        let commit = commits[i];
580        info!("=== Processing {} reference: {} ===", ref_type, git_ref);
581
582        // Switch to target reference
583        git_manager.switch_ref(git_ref)?;
584
585        // Get the cached binary path for this git reference (should already be compiled)
586        let binary_path =
587            compilation_manager.get_cached_binary_path_for_commit(commit, is_optimism);
588
589        // Verify the cached binary exists
590        if !binary_path.exists() {
591            return Err(eyre!(
592                "Cached {} binary not found at {:?}. Compilation phase should have created it.",
593                ref_type,
594                binary_path
595            ));
596        }
597
598        info!("Using cached {} binary (commit: {})", ref_type, &commit[..8]);
599
600        // Get reference-specific base arguments string
601        let base_args_str = match ref_type {
602            "baseline" => args.baseline_args.as_ref(),
603            "feature" => args.feature_args.as_ref(),
604            _ => None,
605        };
606
607        // Build additional args with conditional --debug.startup-sync-state-idle flag
608        let additional_args = args.build_additional_args(ref_type, base_args_str);
609
610        // Start reth node
611        let mut node_process =
612            node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
613
614        // Wait for node to be ready and get its current tip (wherever it is)
615        let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
616        info!("Node is ready at tip: {}", current_tip);
617
618        // Store the tip we'll unwind back to
619        let original_tip = current_tip;
620
621        // Calculate benchmark range
622        // Note: reth-bench has an off-by-one error where it consumes the first block
623        // of the range, so we add 1 to compensate and get exactly args.blocks blocks
624        let from_block = original_tip;
625        let to_block = original_tip + args.blocks;
626
627        // Run benchmark
628        let output_dir = comparison_generator.get_ref_output_dir(ref_type);
629
630        // Capture start timestamp for the benchmark run
631        let benchmark_start = chrono::Utc::now();
632
633        // Run benchmark (comparison logic is handled separately by ComparisonGenerator)
634        benchmark_runner.run_benchmark(from_block, to_block, &output_dir).await?;
635
636        // Capture end timestamp for the benchmark run
637        let benchmark_end = chrono::Utc::now();
638
639        // Stop node
640        node_manager.stop_node(&mut node_process).await?;
641
642        // Unwind back to original tip
643        node_manager.unwind_to_block(original_tip).await?;
644
645        // Store results for comparison
646        comparison_generator.add_ref_results(ref_type, &output_dir)?;
647
648        // Set the benchmark run timestamps
649        comparison_generator.set_ref_timestamps(ref_type, benchmark_start, benchmark_end)?;
650
651        info!("Completed {} reference benchmark", ref_type);
652    }
653
654    // Generate comparison report
655    comparison_generator.generate_comparison_report().await?;
656
657    // Generate charts if requested
658    if args.draw {
659        generate_comparison_charts(comparison_generator).await?;
660    }
661
662    // Start samply servers if profiling was enabled
663    if args.profile {
664        start_samply_servers(args).await?;
665    }
666
667    Ok(())
668}
669
670/// Generate comparison charts using the Python script
671async fn generate_comparison_charts(comparison_generator: &ComparisonGenerator) -> Result<()> {
672    info!("Generating comparison charts with Python script...");
673
674    let baseline_output_dir = comparison_generator.get_ref_output_dir("baseline");
675    let feature_output_dir = comparison_generator.get_ref_output_dir("feature");
676
677    let baseline_csv = baseline_output_dir.join("combined_latency.csv");
678    let feature_csv = feature_output_dir.join("combined_latency.csv");
679
680    // Check if CSV files exist
681    if !baseline_csv.exists() {
682        return Err(eyre!("Baseline CSV not found: {:?}", baseline_csv));
683    }
684    if !feature_csv.exists() {
685        return Err(eyre!("Feature CSV not found: {:?}", feature_csv));
686    }
687
688    let output_dir = comparison_generator.get_output_dir();
689    let chart_output = output_dir.join("latency_comparison.png");
690
691    let script_path = "bin/reth-bench/scripts/compare_newpayload_latency.py";
692
693    info!("Running Python comparison script with uv...");
694    let mut cmd = Command::new("uv");
695    cmd.args([
696        "run",
697        script_path,
698        &baseline_csv.to_string_lossy(),
699        &feature_csv.to_string_lossy(),
700        "-o",
701        &chart_output.to_string_lossy(),
702    ]);
703
704    // Set process group for consistent signal handling
705    #[cfg(unix)]
706    {
707        cmd.process_group(0);
708    }
709
710    let output = cmd.output().await.map_err(|e| {
711        eyre!("Failed to execute Python script with uv: {}. Make sure uv is installed.", e)
712    })?;
713
714    if !output.status.success() {
715        let stderr = String::from_utf8_lossy(&output.stderr);
716        let stdout = String::from_utf8_lossy(&output.stdout);
717        return Err(eyre!(
718            "Python script failed with exit code {:?}:\nstdout: {}\nstderr: {}",
719            output.status.code(),
720            stdout,
721            stderr
722        ));
723    }
724
725    let stdout = String::from_utf8_lossy(&output.stdout);
726    if !stdout.trim().is_empty() {
727        info!("Python script output:\n{}", stdout);
728    }
729
730    info!("Comparison chart generated: {:?}", chart_output);
731    Ok(())
732}
733
734/// Start samply servers for viewing profiles
735async fn start_samply_servers(args: &Args) -> Result<()> {
736    info!("Starting samply servers for profile viewing...");
737
738    let output_dir = args.output_dir_path();
739    let profiles_dir = output_dir.join("profiles");
740
741    // Build profile paths
742    let baseline_profile = profiles_dir.join("baseline.json.gz");
743    let feature_profile = profiles_dir.join("feature.json.gz");
744
745    // Check if profiles exist
746    if !baseline_profile.exists() {
747        warn!("Baseline profile not found: {:?}", baseline_profile);
748        return Ok(());
749    }
750    if !feature_profile.exists() {
751        warn!("Feature profile not found: {:?}", feature_profile);
752        return Ok(());
753    }
754
755    // Find two consecutive available ports starting from 3000
756    let (baseline_port, feature_port) = find_consecutive_ports(3000)?;
757    info!("Found available ports: {} and {}", baseline_port, feature_port);
758
759    // Get samply path
760    let samply_path = get_samply_path().await?;
761
762    // Start baseline server
763    info!("Starting samply server for baseline '{}' on port {}", args.baseline_ref, baseline_port);
764    let mut baseline_cmd = Command::new(&samply_path);
765    baseline_cmd
766        .args(["load", "--port", &baseline_port.to_string(), &baseline_profile.to_string_lossy()])
767        .kill_on_drop(true);
768
769    // Set process group for consistent signal handling
770    #[cfg(unix)]
771    {
772        baseline_cmd.process_group(0);
773    }
774
775    // Conditionally pipe output based on log level
776    if tracing::enabled!(tracing::Level::DEBUG) {
777        baseline_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
778    } else {
779        baseline_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
780    }
781
782    // Debug log the command
783    debug!("Executing samply load command: {:?}", baseline_cmd);
784
785    let mut baseline_child =
786        baseline_cmd.spawn().wrap_err("Failed to start samply server for baseline")?;
787
788    // Stream baseline samply output if debug logging is enabled
789    if tracing::enabled!(tracing::Level::DEBUG) {
790        if let Some(stdout) = baseline_child.stdout.take() {
791            tokio::spawn(async move {
792                use tokio::io::{AsyncBufReadExt, BufReader};
793                let reader = BufReader::new(stdout);
794                let mut lines = reader.lines();
795                while let Ok(Some(line)) = lines.next_line().await {
796                    debug!("[SAMPLY-BASELINE] {}", line);
797                }
798            });
799        }
800
801        if let Some(stderr) = baseline_child.stderr.take() {
802            tokio::spawn(async move {
803                use tokio::io::{AsyncBufReadExt, BufReader};
804                let reader = BufReader::new(stderr);
805                let mut lines = reader.lines();
806                while let Ok(Some(line)) = lines.next_line().await {
807                    debug!("[SAMPLY-BASELINE] {}", line);
808                }
809            });
810        }
811    }
812
813    // Start feature server
814    info!("Starting samply server for feature '{}' on port {}", args.feature_ref, feature_port);
815    let mut feature_cmd = Command::new(&samply_path);
816    feature_cmd
817        .args(["load", "--port", &feature_port.to_string(), &feature_profile.to_string_lossy()])
818        .kill_on_drop(true);
819
820    // Set process group for consistent signal handling
821    #[cfg(unix)]
822    {
823        feature_cmd.process_group(0);
824    }
825
826    // Conditionally pipe output based on log level
827    if tracing::enabled!(tracing::Level::DEBUG) {
828        feature_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
829    } else {
830        feature_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
831    }
832
833    // Debug log the command
834    debug!("Executing samply load command: {:?}", feature_cmd);
835
836    let mut feature_child =
837        feature_cmd.spawn().wrap_err("Failed to start samply server for feature")?;
838
839    // Stream feature samply output if debug logging is enabled
840    if tracing::enabled!(tracing::Level::DEBUG) {
841        if let Some(stdout) = feature_child.stdout.take() {
842            tokio::spawn(async move {
843                use tokio::io::{AsyncBufReadExt, BufReader};
844                let reader = BufReader::new(stdout);
845                let mut lines = reader.lines();
846                while let Ok(Some(line)) = lines.next_line().await {
847                    debug!("[SAMPLY-FEATURE] {}", line);
848                }
849            });
850        }
851
852        if let Some(stderr) = feature_child.stderr.take() {
853            tokio::spawn(async move {
854                use tokio::io::{AsyncBufReadExt, BufReader};
855                let reader = BufReader::new(stderr);
856                let mut lines = reader.lines();
857                while let Ok(Some(line)) = lines.next_line().await {
858                    debug!("[SAMPLY-FEATURE] {}", line);
859                }
860            });
861        }
862    }
863
864    // Give servers time to start
865    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
866
867    // Print access information
868    println!("\n=== SAMPLY PROFILE SERVERS STARTED ===");
869    println!("Baseline '{}': http://127.0.0.1:{}", args.baseline_ref, baseline_port);
870    println!("Feature  '{}': http://127.0.0.1:{}", args.feature_ref, feature_port);
871    println!("\nOpen the URLs in your browser to view the profiles.");
872    println!("Press Ctrl+C to stop the servers and exit.");
873    println!("=========================================\n");
874
875    // Wait for Ctrl+C or process termination
876    let ctrl_c = tokio::signal::ctrl_c();
877    let baseline_wait = baseline_child.wait();
878    let feature_wait = feature_child.wait();
879
880    tokio::select! {
881        _ = ctrl_c => {
882            info!("Received Ctrl+C, shutting down samply servers...");
883        }
884        result = baseline_wait => {
885            match result {
886                Ok(status) => info!("Baseline samply server exited with status: {}", status),
887                Err(e) => warn!("Baseline samply server error: {}", e),
888            }
889        }
890        result = feature_wait => {
891            match result {
892                Ok(status) => info!("Feature samply server exited with status: {}", status),
893                Err(e) => warn!("Feature samply server error: {}", e),
894            }
895        }
896    }
897
898    // Ensure both processes are terminated
899    let _ = baseline_child.kill().await;
900    let _ = feature_child.kill().await;
901
902    info!("Samply servers stopped.");
903    Ok(())
904}
905
906/// Find two consecutive available ports starting from the given port
907fn find_consecutive_ports(start_port: u16) -> Result<(u16, u16)> {
908    for port in start_port..=65533 {
909        // Check if both port and port+1 are available
910        if is_port_available(port) && is_port_available(port + 1) {
911            return Ok((port, port + 1));
912        }
913    }
914    Err(eyre!("Could not find two consecutive available ports starting from {}", start_port))
915}
916
917/// Check if a port is available by attempting to bind to it
918fn is_port_available(port: u16) -> bool {
919    TcpListener::bind(("127.0.0.1", port)).is_ok()
920}
921
922/// Get the absolute path to samply using 'which' command
923async fn get_samply_path() -> Result<String> {
924    let output = Command::new("which")
925        .arg("samply")
926        .output()
927        .await
928        .wrap_err("Failed to execute 'which samply' command")?;
929
930    if !output.status.success() {
931        return Err(eyre!("samply not found in PATH"));
932    }
933
934    let samply_path = String::from_utf8(output.stdout)
935        .wrap_err("samply path is not valid UTF-8")?
936        .trim()
937        .to_string();
938
939    if samply_path.is_empty() {
940        return Err(eyre!("which samply returned empty path"));
941    }
942
943    Ok(samply_path)
944}