1use alloy_provider::{Provider, ProviderBuilder};
4use clap::Parser;
5use eyre::{eyre, Result, WrapErr};
6use reth_chainspec::Chain;
7use reth_cli_runner::CliContext;
8use reth_node_core::args::{DatadirArgs, LogArgs, TraceArgs};
9use reth_tracing::FileWorkerGuard;
10use std::{net::TcpListener, path::PathBuf, str::FromStr};
11use tokio::process::Command;
12use tracing::{debug, info, warn};
13
14use crate::{
15 benchmark::BenchmarkRunner, comparison::ComparisonGenerator, compilation::CompilationManager,
16 git::GitManager, node::NodeManager,
17};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub(crate) enum DisableStartupSyncStateIdle {
22 Baseline,
24 Feature,
26 All,
28}
29
30impl FromStr for DisableStartupSyncStateIdle {
31 type Err = String;
32
33 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
34 match s.to_lowercase().as_str() {
35 "baseline" => Ok(Self::Baseline),
36 "feature" => Ok(Self::Feature),
37 "all" => Ok(Self::All),
38 _ => Err(format!("Invalid value '{}'. Expected 'baseline', 'feature', or 'all'", s)),
39 }
40 }
41}
42
43impl std::fmt::Display for DisableStartupSyncStateIdle {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 Self::Baseline => write!(f, "baseline"),
47 Self::Feature => write!(f, "feature"),
48 Self::All => write!(f, "all"),
49 }
50 }
51}
52
53#[derive(Debug, Parser)]
55#[command(
56 name = "reth-bench-compare",
57 about = "Compare reth performance between two git references (branches or tags)",
58 version
59)]
60pub(crate) struct Args {
61 #[arg(long, value_name = "REF")]
63 pub baseline_ref: String,
64
65 #[arg(long, value_name = "REF")]
67 pub feature_ref: String,
68
69 #[command(flatten)]
70 pub datadir: DatadirArgs,
71
72 #[arg(long, value_name = "N", default_value = "100")]
74 pub blocks: u64,
75
76 #[arg(long, value_name = "URL")]
78 pub rpc_url: Option<String>,
79
80 #[arg(long, value_name = "PATH")]
85 pub jwt_secret: Option<PathBuf>,
86
87 #[arg(long, value_name = "PATH", default_value = "./reth-bench-compare")]
89 pub output_dir: String,
90
91 #[arg(long)]
93 pub skip_git_validation: bool,
94
95 #[arg(long, value_name = "PORT", default_value = "5005")]
97 pub metrics_port: u16,
98
99 #[arg(long, value_name = "CHAIN", default_value = "mainnet", required = false)]
103 pub chain: Chain,
104
105 #[arg(long)]
107 pub sudo: bool,
108
109 #[arg(long)]
111 pub draw: bool,
112
113 #[arg(long)]
115 pub profile: bool,
116
117 #[arg(long, value_name = "DURATION", hide = true)]
122 pub wait_time: Option<String>,
123
124 #[arg(long)]
130 pub wait_for_persistence: bool,
131
132 #[arg(long, value_name = "PERSISTENCE_THRESHOLD")]
138 pub persistence_threshold: Option<u64>,
139
140 #[arg(long, value_name = "N")]
143 pub warmup_blocks: Option<u64>,
144
145 #[arg(long)]
148 pub no_clear_cache: bool,
149
150 #[arg(long)]
153 pub skip_wait_syncing: bool,
154
155 #[command(flatten)]
156 pub logs: LogArgs,
157
158 #[command(flatten)]
159 pub traces: TraceArgs,
160
161 #[arg(
164 long,
165 value_name = "OTLP_BUFFER_SIZE",
166 default_value = "32768",
167 help_heading = "Tracing"
168 )]
169 pub otlp_max_queue_size: usize,
170
171 #[arg(long, value_name = "ARGS")]
175 pub baseline_args: Option<String>,
176
177 #[arg(long, value_name = "ARGS")]
181 pub feature_args: Option<String>,
182
183 #[arg(trailing_var_arg = true, allow_hyphen_values = true)]
189 pub reth_args: Vec<String>,
190
191 #[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
195 pub features: String,
196
197 #[arg(long, value_name = "FEATURES")]
201 pub baseline_features: Option<String>,
202
203 #[arg(long, value_name = "FEATURES")]
207 pub feature_features: Option<String>,
208
209 #[arg(long, value_name = "FLAGS", default_value = "-C target-cpu=native")]
213 pub rustflags: String,
214
215 #[arg(long, value_name = "FLAGS")]
219 pub baseline_rustflags: Option<String>,
220
221 #[arg(long, value_name = "FLAGS")]
225 pub feature_rustflags: Option<String>,
226
227 #[arg(long, value_name = "TARGET")]
234 pub disable_startup_sync_state_idle: Option<DisableStartupSyncStateIdle>,
235}
236
237impl Args {
238 pub(crate) fn init_tracing(&self) -> Result<Option<FileWorkerGuard>> {
240 let guard = self.logs.init_tracing()?;
241 Ok(guard)
242 }
243
244 pub(crate) fn build_additional_args(
247 &self,
248 ref_type: &str,
249 base_args_str: Option<&String>,
250 ) -> Vec<String> {
251 let mut args = base_args_str.map(|s| parse_args_string(s)).unwrap_or_default();
253
254 let should_add_flag = match self.disable_startup_sync_state_idle {
256 None => true, Some(DisableStartupSyncStateIdle::All) => false,
258 Some(DisableStartupSyncStateIdle::Baseline) => {
259 ref_type != "baseline" && ref_type != "warmup"
260 }
261 Some(DisableStartupSyncStateIdle::Feature) => ref_type != "feature",
262 };
263
264 if should_add_flag {
265 args.push("--debug.startup-sync-state-idle".to_string());
266 debug!("Adding --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
267 } else {
268 debug!("Skipping --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
269 }
270
271 args
272 }
273
274 const fn get_default_rpc_url(chain: &Chain) -> &'static str {
276 match chain.id() {
277 8453 => "https://base-mainnet.rpc.ithaca.xyz", 84532 => "https://base-sepolia.rpc.ithaca.xyz", 27082 => "https://rpc.hoodi.ethpandaops.io", _ => "https://reth-ethereum.ithaca.xyz/rpc", }
282 }
283
284 pub(crate) fn get_rpc_url(&self) -> String {
286 self.rpc_url.clone().unwrap_or_else(|| Self::get_default_rpc_url(&self.chain).to_string())
287 }
288
289 pub(crate) fn jwt_secret_path(&self) -> PathBuf {
291 match &self.jwt_secret {
292 Some(path) => {
293 let jwt_secret_str = path.to_string_lossy();
294 let expanded = shellexpand::tilde(&jwt_secret_str);
295 PathBuf::from(expanded.as_ref())
296 }
297 None => {
298 let chain_path = self.datadir.clone().resolve_datadir(self.chain);
300 chain_path.jwt()
301 }
302 }
303 }
304
305 pub(crate) fn datadir_path(&self) -> PathBuf {
307 let chain_path = self.datadir.clone().resolve_datadir(self.chain);
308 chain_path.data_dir().to_path_buf()
309 }
310
311 pub(crate) fn output_dir_path(&self) -> PathBuf {
313 let expanded = shellexpand::tilde(&self.output_dir);
314 PathBuf::from(expanded.as_ref())
315 }
316
317 pub(crate) fn get_warmup_blocks(&self) -> u64 {
319 self.warmup_blocks.unwrap_or(self.blocks)
320 }
321}
322
323async fn validate_rpc_chain_id(rpc_url: &str, expected_chain: &Chain) -> Result<()> {
325 let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
327 let provider = ProviderBuilder::new().connect_http(url);
328
329 let rpc_chain_id = provider
331 .get_chain_id()
332 .await
333 .map_err(|e| eyre!("Failed to get chain ID from RPC endpoint {}: {:?}", rpc_url, e))?;
334
335 let expected_chain_id = expected_chain.id();
336
337 if rpc_chain_id != expected_chain_id {
338 return Err(eyre!(
339 "RPC endpoint chain ID mismatch!\n\
340 Expected: {} (chain: {})\n\
341 Found: {} at RPC endpoint: {}\n\n\
342 Please use an RPC endpoint for the correct network or change the --chain argument.",
343 expected_chain_id,
344 expected_chain,
345 rpc_chain_id,
346 rpc_url
347 ));
348 }
349
350 info!("Validated RPC endpoint chain ID");
351 Ok(())
352}
353
354pub(crate) async fn run_comparison(args: Args, _ctx: CliContext) -> Result<()> {
356 #[cfg(unix)]
358 {
359 use nix::unistd::{getpid, setpgid};
360 if let Err(e) = setpgid(getpid(), getpid()) {
361 warn!("Failed to create process group: {e}");
362 }
363 }
364
365 info!(
366 "Starting benchmark comparison between '{}' and '{}'",
367 args.baseline_ref, args.feature_ref
368 );
369
370 if args.sudo {
371 info!("Running in sudo mode - reth commands will use elevated privileges");
372 }
373
374 let git_manager = GitManager::new()?;
376 git_manager.fetch_all()?;
378
379 let output_dir = args.output_dir_path();
381 let compilation_manager = CompilationManager::new(
382 git_manager.repo_root().to_string(),
383 output_dir.clone(),
384 git_manager.clone(),
385 )?;
386 let mut node_manager = NodeManager::new(&args);
388
389 let benchmark_runner = BenchmarkRunner::new(&args);
390 let mut comparison_generator = ComparisonGenerator::new(&args);
391
392 node_manager.set_comparison_dir(comparison_generator.get_output_dir());
394
395 let original_ref = git_manager.get_current_ref()?;
397 info!("Current git reference: {}", original_ref);
398
399 if !args.skip_git_validation {
401 git_manager.validate_clean_state()?;
402 git_manager.validate_refs(&[&args.baseline_ref, &args.feature_ref])?;
403 }
404
405 let rpc_url = args.get_rpc_url();
407 validate_rpc_chain_id(&rpc_url, &args.chain).await?;
408
409 let git_manager_cleanup = git_manager.clone();
411 let original_ref_cleanup = original_ref.clone();
412 ctrlc::set_handler(move || {
413 eprintln!("Received interrupt signal, cleaning up...");
414
415 #[cfg(unix)]
417 {
418 use nix::{
419 sys::signal::{kill, Signal},
420 unistd::Pid,
421 };
422
423 let current_pid = std::process::id() as i32;
425 let pgid = Pid::from_raw(-current_pid);
426 if let Err(e) = kill(pgid, Signal::SIGTERM) {
427 eprintln!("Failed to send SIGTERM to process group: {e}");
428 }
429 }
430
431 std::thread::sleep(std::time::Duration::from_millis(200));
433
434 if let Err(e) = git_manager_cleanup.switch_ref(&original_ref_cleanup) {
435 eprintln!("Failed to restore original git reference: {e}");
436 eprintln!("You may need to manually run: git checkout {original_ref_cleanup}");
437 }
438 std::process::exit(1);
439 })?;
440
441 let result = run_benchmark_workflow(
442 &git_manager,
443 &compilation_manager,
444 &mut node_manager,
445 &benchmark_runner,
446 &mut comparison_generator,
447 &args,
448 )
449 .await;
450
451 info!("Restoring original git reference: {}", original_ref);
453 git_manager.switch_ref(&original_ref)?;
454
455 result?;
457
458 Ok(())
459}
460
461fn parse_args_string(args_str: &str) -> Vec<String> {
463 shlex::split(args_str).unwrap_or_else(|| {
464 args_str.split_whitespace().map(|s| s.to_string()).collect()
466 })
467}
468
469async fn run_compilation_phase(
471 git_manager: &GitManager,
472 compilation_manager: &CompilationManager,
473 args: &Args,
474 is_optimism: bool,
475) -> Result<(String, String)> {
476 info!("=== Running compilation phase ===");
477
478 compilation_manager.ensure_reth_bench_available()?;
480 if args.profile {
481 compilation_manager.ensure_samply_available()?;
482 }
483
484 let refs = [&args.baseline_ref, &args.feature_ref];
485 let ref_types = ["baseline", "feature"];
486
487 let mut ref_commits = std::collections::HashMap::new();
490 for &git_ref in &refs {
491 if !ref_commits.contains_key(git_ref) {
492 git_manager.switch_ref(git_ref)?;
493 let commit = git_manager.get_current_commit()?;
494 ref_commits.insert(git_ref.clone(), commit);
495 info!("Reference {} resolves to commit: {}", git_ref, &ref_commits[git_ref][..8]);
496 }
497 }
498
499 for (i, &git_ref) in refs.iter().enumerate() {
501 let ref_type = ref_types[i];
502 let commit = &ref_commits[git_ref];
503
504 let features = match ref_type {
506 "baseline" => args.baseline_features.as_ref().unwrap_or(&args.features),
507 "feature" => args.feature_features.as_ref().unwrap_or(&args.features),
508 _ => &args.features,
509 };
510 let rustflags = match ref_type {
511 "baseline" => args.baseline_rustflags.as_ref().unwrap_or(&args.rustflags),
512 "feature" => args.feature_rustflags.as_ref().unwrap_or(&args.rustflags),
513 _ => &args.rustflags,
514 };
515
516 info!(
517 "Compiling {} binary for reference: {} (commit: {})",
518 ref_type,
519 git_ref,
520 &commit[..8]
521 );
522
523 git_manager.switch_ref(git_ref)?;
525
526 compilation_manager.compile_reth(commit, is_optimism, features, rustflags)?;
528
529 info!("Completed compilation for {} reference", ref_type);
530 }
531
532 let baseline_commit = ref_commits[&args.baseline_ref].clone();
533 let feature_commit = ref_commits[&args.feature_ref].clone();
534
535 info!("Compilation phase completed");
536 Ok((baseline_commit, feature_commit))
537}
538
539#[allow(clippy::too_many_arguments)]
540async fn run_warmup_phase(
542 git_manager: &GitManager,
543 compilation_manager: &CompilationManager,
544 node_manager: &mut NodeManager,
545 benchmark_runner: &BenchmarkRunner,
546 args: &Args,
547 is_optimism: bool,
548 baseline_commit: &str,
549 starting_tip: u64,
550) -> Result<()> {
551 info!("=== Running warmup phase ===");
552
553 let warmup_blocks = args.get_warmup_blocks();
555 let unwind_target = starting_tip.saturating_sub(warmup_blocks);
556 node_manager.unwind_to_block(unwind_target).await?;
557
558 let warmup_ref = &args.baseline_ref;
560
561 git_manager.switch_ref(warmup_ref)?;
563
564 let binary_path =
566 compilation_manager.get_cached_binary_path_for_commit(baseline_commit, is_optimism);
567
568 if !binary_path.exists() {
570 return Err(eyre!(
571 "Cached baseline binary not found at {:?}. Compilation phase should have created it.",
572 binary_path
573 ));
574 }
575
576 info!("Using cached baseline binary for warmup (commit: {})", &baseline_commit[..8]);
577
578 let additional_args = args.build_additional_args("warmup", args.baseline_args.as_ref());
580
581 let (mut node_process, _warmup_command) =
583 node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
584
585 let current_tip = if args.skip_wait_syncing {
587 node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
588 } else {
589 node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
590 };
591 info!("Warmup node is ready at tip: {}", current_tip);
592
593 if args.no_clear_cache {
595 info!("Skipping filesystem cache clearing (--no-clear-cache flag set)");
596 } else {
597 BenchmarkRunner::clear_fs_caches().await?;
598 }
599
600 benchmark_runner.run_warmup(current_tip).await?;
602
603 node_manager.stop_node(&mut node_process).await?;
605
606 info!("Warmup phase completed");
607 Ok(())
608}
609
610async fn run_benchmark_workflow(
612 git_manager: &GitManager,
613 compilation_manager: &CompilationManager,
614 node_manager: &mut NodeManager,
615 benchmark_runner: &BenchmarkRunner,
616 comparison_generator: &mut ComparisonGenerator,
617 args: &Args,
618) -> Result<()> {
619 let rpc_url = args.get_rpc_url();
621 let is_optimism = compilation_manager.detect_optimism_chain(&rpc_url).await?;
622
623 let (baseline_commit, feature_commit) =
625 run_compilation_phase(git_manager, compilation_manager, args, is_optimism).await?;
626
627 git_manager.switch_ref(&args.baseline_ref)?;
629 let binary_path =
630 compilation_manager.get_cached_binary_path_for_commit(&baseline_commit, is_optimism);
631 if !binary_path.exists() {
632 return Err(eyre!(
633 "Cached baseline binary not found at {:?}. Compilation phase should have created it.",
634 binary_path
635 ));
636 }
637
638 info!("=== Determining initial block height ===");
640 let additional_args = args.build_additional_args("baseline", args.baseline_args.as_ref());
641 let (mut node_process, _) = node_manager
642 .start_node(&binary_path, &args.baseline_ref, "baseline", &additional_args)
643 .await?;
644 let starting_tip = if args.skip_wait_syncing {
645 node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
646 } else {
647 node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
648 };
649 info!("Node starting tip: {}", starting_tip);
650 node_manager.stop_node(&mut node_process).await?;
651
652 if args.get_warmup_blocks() > 0 {
654 run_warmup_phase(
655 git_manager,
656 compilation_manager,
657 node_manager,
658 benchmark_runner,
659 args,
660 is_optimism,
661 &baseline_commit,
662 starting_tip,
663 )
664 .await?;
665 } else {
666 info!("Skipping warmup phase (warmup_blocks is 0)");
667 }
668
669 let refs = [&args.baseline_ref, &args.feature_ref];
670 let ref_types = ["baseline", "feature"];
671 let commits = [&baseline_commit, &feature_commit];
672
673 for (i, &git_ref) in refs.iter().enumerate() {
674 let ref_type = ref_types[i];
675 let commit = commits[i];
676 info!("=== Processing {} reference: {} ===", ref_type, git_ref);
677
678 let unwind_target = starting_tip.saturating_sub(args.blocks);
680 node_manager.unwind_to_block(unwind_target).await?;
681
682 git_manager.switch_ref(git_ref)?;
684
685 let binary_path =
687 compilation_manager.get_cached_binary_path_for_commit(commit, is_optimism);
688
689 if !binary_path.exists() {
691 return Err(eyre!(
692 "Cached {} binary not found at {:?}. Compilation phase should have created it.",
693 ref_type,
694 binary_path
695 ));
696 }
697
698 info!("Using cached {} binary (commit: {})", ref_type, &commit[..8]);
699
700 let base_args_str = match ref_type {
702 "baseline" => args.baseline_args.as_ref(),
703 "feature" => args.feature_args.as_ref(),
704 _ => None,
705 };
706
707 let additional_args = args.build_additional_args(ref_type, base_args_str);
709
710 let (mut node_process, reth_command) =
712 node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
713
714 let current_tip = if args.skip_wait_syncing {
716 node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
717 } else {
718 node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
719 };
720 info!("Node is ready at tip: {}", current_tip);
721
722 let from_block = current_tip;
726 let to_block = current_tip + args.blocks;
727
728 let output_dir = comparison_generator.get_ref_output_dir(ref_type);
730
731 let benchmark_start = chrono::Utc::now();
733
734 benchmark_runner.run_benchmark(from_block, to_block, &output_dir).await?;
736
737 let benchmark_end = chrono::Utc::now();
739
740 node_manager.stop_node(&mut node_process).await?;
742
743 comparison_generator.add_ref_results(ref_type, &output_dir)?;
745
746 comparison_generator.set_ref_timestamps(ref_type, benchmark_start, benchmark_end)?;
748 comparison_generator.set_ref_command(ref_type, reth_command)?;
749
750 info!("Completed {} reference benchmark", ref_type);
751 }
752
753 comparison_generator.generate_comparison_report().await?;
755
756 if args.draw {
758 generate_comparison_charts(comparison_generator).await?;
759 }
760
761 if args.profile {
763 start_samply_servers(args).await?;
764 }
765
766 Ok(())
767}
768
769async fn generate_comparison_charts(comparison_generator: &ComparisonGenerator) -> Result<()> {
771 info!("Generating comparison charts with Python script...");
772
773 let baseline_output_dir = comparison_generator.get_ref_output_dir("baseline");
774 let feature_output_dir = comparison_generator.get_ref_output_dir("feature");
775
776 let baseline_csv = baseline_output_dir.join("combined_latency.csv");
777 let feature_csv = feature_output_dir.join("combined_latency.csv");
778
779 if !baseline_csv.exists() {
781 return Err(eyre!("Baseline CSV not found: {:?}", baseline_csv));
782 }
783 if !feature_csv.exists() {
784 return Err(eyre!("Feature CSV not found: {:?}", feature_csv));
785 }
786
787 let output_dir = comparison_generator.get_output_dir();
788 let chart_output = output_dir.join("latency_comparison.png");
789
790 let script_path = "bin/reth-bench/scripts/compare_newpayload_latency.py";
791
792 info!("Running Python comparison script with uv...");
793 let mut cmd = Command::new("uv");
794 cmd.args([
795 "run",
796 script_path,
797 &baseline_csv.to_string_lossy(),
798 &feature_csv.to_string_lossy(),
799 "-o",
800 &chart_output.to_string_lossy(),
801 ]);
802
803 #[cfg(unix)]
805 {
806 cmd.process_group(0);
807 }
808
809 let output = cmd.output().await.map_err(|e| {
810 eyre!("Failed to execute Python script with uv: {}. Make sure uv is installed.", e)
811 })?;
812
813 if !output.status.success() {
814 let stderr = String::from_utf8_lossy(&output.stderr);
815 let stdout = String::from_utf8_lossy(&output.stdout);
816 return Err(eyre!(
817 "Python script failed with exit code {:?}:\nstdout: {}\nstderr: {}",
818 output.status.code(),
819 stdout,
820 stderr
821 ));
822 }
823
824 let stdout = String::from_utf8_lossy(&output.stdout);
825 if !stdout.trim().is_empty() {
826 info!("Python script output:\n{}", stdout);
827 }
828
829 info!("Comparison chart generated: {:?}", chart_output);
830 Ok(())
831}
832
833async fn start_samply_servers(args: &Args) -> Result<()> {
835 info!("Starting samply servers for profile viewing...");
836
837 let output_dir = args.output_dir_path();
838 let profiles_dir = output_dir.join("profiles");
839
840 let baseline_profile = profiles_dir.join("baseline.json.gz");
842 let feature_profile = profiles_dir.join("feature.json.gz");
843
844 if !baseline_profile.exists() {
846 warn!("Baseline profile not found: {:?}", baseline_profile);
847 return Ok(());
848 }
849 if !feature_profile.exists() {
850 warn!("Feature profile not found: {:?}", feature_profile);
851 return Ok(());
852 }
853
854 let (baseline_port, feature_port) = find_consecutive_ports(3000)?;
856 info!("Found available ports: {} and {}", baseline_port, feature_port);
857
858 let samply_path = get_samply_path().await?;
860
861 info!("Starting samply server for baseline '{}' on port {}", args.baseline_ref, baseline_port);
863 let mut baseline_cmd = Command::new(&samply_path);
864 baseline_cmd
865 .args(["load", "--port", &baseline_port.to_string(), &baseline_profile.to_string_lossy()])
866 .kill_on_drop(true);
867
868 #[cfg(unix)]
870 {
871 baseline_cmd.process_group(0);
872 }
873
874 if tracing::enabled!(tracing::Level::DEBUG) {
876 baseline_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
877 } else {
878 baseline_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
879 }
880
881 debug!("Executing samply load command: {:?}", baseline_cmd);
883
884 let mut baseline_child =
885 baseline_cmd.spawn().wrap_err("Failed to start samply server for baseline")?;
886
887 if tracing::enabled!(tracing::Level::DEBUG) {
889 if let Some(stdout) = baseline_child.stdout.take() {
890 tokio::spawn(async move {
891 use tokio::io::{AsyncBufReadExt, BufReader};
892 let reader = BufReader::new(stdout);
893 let mut lines = reader.lines();
894 while let Ok(Some(line)) = lines.next_line().await {
895 debug!("[SAMPLY-BASELINE] {}", line);
896 }
897 });
898 }
899
900 if let Some(stderr) = baseline_child.stderr.take() {
901 tokio::spawn(async move {
902 use tokio::io::{AsyncBufReadExt, BufReader};
903 let reader = BufReader::new(stderr);
904 let mut lines = reader.lines();
905 while let Ok(Some(line)) = lines.next_line().await {
906 debug!("[SAMPLY-BASELINE] {}", line);
907 }
908 });
909 }
910 }
911
912 info!("Starting samply server for feature '{}' on port {}", args.feature_ref, feature_port);
914 let mut feature_cmd = Command::new(&samply_path);
915 feature_cmd
916 .args(["load", "--port", &feature_port.to_string(), &feature_profile.to_string_lossy()])
917 .kill_on_drop(true);
918
919 #[cfg(unix)]
921 {
922 feature_cmd.process_group(0);
923 }
924
925 if tracing::enabled!(tracing::Level::DEBUG) {
927 feature_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
928 } else {
929 feature_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
930 }
931
932 debug!("Executing samply load command: {:?}", feature_cmd);
934
935 let mut feature_child =
936 feature_cmd.spawn().wrap_err("Failed to start samply server for feature")?;
937
938 if tracing::enabled!(tracing::Level::DEBUG) {
940 if let Some(stdout) = feature_child.stdout.take() {
941 tokio::spawn(async move {
942 use tokio::io::{AsyncBufReadExt, BufReader};
943 let reader = BufReader::new(stdout);
944 let mut lines = reader.lines();
945 while let Ok(Some(line)) = lines.next_line().await {
946 debug!("[SAMPLY-FEATURE] {}", line);
947 }
948 });
949 }
950
951 if let Some(stderr) = feature_child.stderr.take() {
952 tokio::spawn(async move {
953 use tokio::io::{AsyncBufReadExt, BufReader};
954 let reader = BufReader::new(stderr);
955 let mut lines = reader.lines();
956 while let Ok(Some(line)) = lines.next_line().await {
957 debug!("[SAMPLY-FEATURE] {}", line);
958 }
959 });
960 }
961 }
962
963 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
965
966 println!("\n=== SAMPLY PROFILE SERVERS STARTED ===");
968 println!("Baseline '{}': http://127.0.0.1:{}", args.baseline_ref, baseline_port);
969 println!("Feature '{}': http://127.0.0.1:{}", args.feature_ref, feature_port);
970 println!("\nOpen the URLs in your browser to view the profiles.");
971 println!("Press Ctrl+C to stop the servers and exit.");
972 println!("=========================================\n");
973
974 let ctrl_c = tokio::signal::ctrl_c();
976 let baseline_wait = baseline_child.wait();
977 let feature_wait = feature_child.wait();
978
979 tokio::select! {
980 _ = ctrl_c => {
981 info!("Received Ctrl+C, shutting down samply servers...");
982 }
983 result = baseline_wait => {
984 match result {
985 Ok(status) => info!("Baseline samply server exited with status: {}", status),
986 Err(e) => warn!("Baseline samply server error: {}", e),
987 }
988 }
989 result = feature_wait => {
990 match result {
991 Ok(status) => info!("Feature samply server exited with status: {}", status),
992 Err(e) => warn!("Feature samply server error: {}", e),
993 }
994 }
995 }
996
997 let _ = baseline_child.kill().await;
999 let _ = feature_child.kill().await;
1000
1001 info!("Samply servers stopped.");
1002 Ok(())
1003}
1004
1005fn find_consecutive_ports(start_port: u16) -> Result<(u16, u16)> {
1007 for port in start_port..=65533 {
1008 if is_port_available(port) && is_port_available(port + 1) {
1010 return Ok((port, port + 1));
1011 }
1012 }
1013 Err(eyre!("Could not find two consecutive available ports starting from {}", start_port))
1014}
1015
1016fn is_port_available(port: u16) -> bool {
1018 TcpListener::bind(("127.0.0.1", port)).is_ok()
1019}
1020
1021async fn get_samply_path() -> Result<String> {
1023 let output = Command::new("which")
1024 .arg("samply")
1025 .output()
1026 .await
1027 .wrap_err("Failed to execute 'which samply' command")?;
1028
1029 if !output.status.success() {
1030 return Err(eyre!("samply not found in PATH"));
1031 }
1032
1033 let samply_path = String::from_utf8(output.stdout)
1034 .wrap_err("samply path is not valid UTF-8")?
1035 .trim()
1036 .to_string();
1037
1038 if samply_path.is_empty() {
1039 return Err(eyre!("which samply returned empty path"));
1040 }
1041
1042 Ok(samply_path)
1043}