Skip to main content

reth_bench_compare/
cli.rs

1//! CLI argument parsing and main command orchestration.
2
3use alloy_provider::{Provider, ProviderBuilder};
4use clap::Parser;
5use eyre::{eyre, Result, WrapErr};
6use reth_chainspec::Chain;
7use reth_cli_runner::CliContext;
8use reth_node_core::args::{DatadirArgs, LogArgs, TraceArgs};
9use reth_tracing::FileWorkerGuard;
10use std::{net::TcpListener, path::PathBuf, str::FromStr};
11use tokio::process::Command;
12use tracing::{debug, info, warn};
13
14use crate::{
15    benchmark::BenchmarkRunner, comparison::ComparisonGenerator, compilation::CompilationManager,
16    git::GitManager, node::NodeManager,
17};
18
19/// Target for disabling the --debug.startup-sync-state-idle flag
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub(crate) enum DisableStartupSyncStateIdle {
22    /// Disable for baseline and warmup runs
23    Baseline,
24    /// Disable for feature runs only
25    Feature,
26    /// Disable for all runs
27    All,
28}
29
30impl FromStr for DisableStartupSyncStateIdle {
31    type Err = String;
32
33    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
34        match s.to_lowercase().as_str() {
35            "baseline" => Ok(Self::Baseline),
36            "feature" => Ok(Self::Feature),
37            "all" => Ok(Self::All),
38            _ => Err(format!("Invalid value '{}'. Expected 'baseline', 'feature', or 'all'", s)),
39        }
40    }
41}
42
43impl std::fmt::Display for DisableStartupSyncStateIdle {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Baseline => write!(f, "baseline"),
47            Self::Feature => write!(f, "feature"),
48            Self::All => write!(f, "all"),
49        }
50    }
51}
52
53/// Automated reth benchmark comparison between git references
54#[derive(Debug, Parser)]
55#[command(
56    name = "reth-bench-compare",
57    about = "Compare reth performance between two git references (branches or tags)",
58    version
59)]
60pub(crate) struct Args {
61    /// Git reference (branch or tag) to use as baseline for comparison
62    #[arg(long, value_name = "REF")]
63    pub baseline_ref: String,
64
65    /// Git reference (branch or tag) to compare against the baseline
66    #[arg(long, value_name = "REF")]
67    pub feature_ref: String,
68
69    #[command(flatten)]
70    pub datadir: DatadirArgs,
71
72    /// Number of blocks to benchmark
73    #[arg(long, value_name = "N", default_value = "100")]
74    pub blocks: u64,
75
76    /// RPC endpoint for fetching block data
77    #[arg(long, value_name = "URL")]
78    pub rpc_url: Option<String>,
79
80    /// JWT secret file path
81    ///
82    /// If not provided, defaults to `<datadir>/<chain>/jwt.hex`.
83    /// If the file doesn't exist, it will be created automatically.
84    #[arg(long, value_name = "PATH")]
85    pub jwt_secret: Option<PathBuf>,
86
87    /// Output directory for benchmark results
88    #[arg(long, value_name = "PATH", default_value = "./reth-bench-compare")]
89    pub output_dir: String,
90
91    /// Skip git branch validation (useful for testing)
92    #[arg(long)]
93    pub skip_git_validation: bool,
94
95    /// Port for reth metrics endpoint
96    #[arg(long, value_name = "PORT", default_value = "5005")]
97    pub metrics_port: u16,
98
99    /// The chain this node is running.
100    ///
101    /// Possible values are either a built-in chain name or numeric chain ID.
102    #[arg(long, value_name = "CHAIN", default_value = "mainnet", required = false)]
103    pub chain: Chain,
104
105    /// Run reth binary with sudo (for elevated privileges)
106    #[arg(long)]
107    pub sudo: bool,
108
109    /// Generate comparison charts using Python script
110    #[arg(long)]
111    pub draw: bool,
112
113    /// Enable CPU profiling with samply during benchmark runs
114    #[arg(long)]
115    pub profile: bool,
116
117    /// Optional fixed delay between engine API calls (passed to reth-bench).
118    ///
119    /// Can be combined with `--wait-for-persistence`: when both are set,
120    /// waits at least this duration, and also waits for persistence if needed.
121    #[arg(long, value_name = "DURATION")]
122    pub wait_time: Option<String>,
123
124    /// Wait for blocks to be persisted before sending the next batch (passed to reth-bench).
125    ///
126    /// When enabled, waits for every Nth block to be persisted using the
127    /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
128    /// doesn't outpace persistence.
129    ///
130    /// Can be combined with `--wait-time`: when both are set, waits at least
131    /// wait-time, and also waits for persistence if the block hasn't been persisted yet.
132    #[arg(long)]
133    pub wait_for_persistence: bool,
134
135    /// Engine persistence threshold (passed to reth-bench).
136    ///
137    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
138    /// matches the engine's default persistence threshold (2), so waits occur
139    /// at blocks 3, 6, 9, etc.
140    #[arg(long, value_name = "PERSISTENCE_THRESHOLD")]
141    pub persistence_threshold: Option<u64>,
142
143    /// Number of blocks to run for cache warmup after clearing caches.
144    /// If not specified, defaults to the same as --blocks
145    #[arg(long, value_name = "N")]
146    pub warmup_blocks: Option<u64>,
147
148    /// Disable filesystem cache clearing before warmup phase.
149    /// By default, filesystem caches are cleared before warmup to ensure consistent benchmarks.
150    #[arg(long)]
151    pub no_clear_cache: bool,
152
153    /// Skip waiting for the node to sync before starting benchmarks.
154    /// When enabled, assumes the node is already synced and skips the initial tip check.
155    #[arg(long)]
156    pub skip_wait_syncing: bool,
157
158    #[command(flatten)]
159    pub logs: LogArgs,
160
161    #[command(flatten)]
162    pub traces: TraceArgs,
163
164    /// Maximum queue size for OTLP Batch Span Processor (traces).
165    /// Higher values prevent trace drops when benchmarking many blocks.
166    #[arg(
167        long,
168        value_name = "OTLP_BUFFER_SIZE",
169        default_value = "32768",
170        help_heading = "Tracing"
171    )]
172    pub otlp_max_queue_size: usize,
173
174    /// Additional arguments to pass to baseline reth node command
175    ///
176    /// Example: `--baseline-args "--debug.tip 0xabc..."`
177    #[arg(long, value_name = "ARGS")]
178    pub baseline_args: Option<String>,
179
180    /// Additional arguments to pass to feature reth node command
181    ///
182    /// Example: `--feature-args "--debug.tip 0xdef..."`
183    #[arg(long, value_name = "ARGS")]
184    pub feature_args: Option<String>,
185
186    /// Additional arguments to pass to reth node command (applied to both baseline and feature)
187    ///
188    /// All arguments after `--` will be passed directly to the reth node command.
189    /// Example: `reth-bench-compare --baseline-ref main --feature-ref pr/123 -- --debug.tip
190    /// 0xabc...`
191    #[arg(trailing_var_arg = true, allow_hyphen_values = true)]
192    pub reth_args: Vec<String>,
193
194    /// Comma-separated list of extra features to enable during reth compilation (applied to both
195    /// builds)
196    #[arg(long, value_name = "FEATURES", default_value = "")]
197    pub features: String,
198
199    /// Comma-separated list of features to enable only for baseline build (overrides --features)
200    ///
201    /// Example: `--baseline-features jemalloc`
202    #[arg(long, value_name = "FEATURES")]
203    pub baseline_features: Option<String>,
204
205    /// Comma-separated list of features to enable only for feature build (overrides --features)
206    ///
207    /// Example: `--feature-features jemalloc-prof`
208    #[arg(long, value_name = "FEATURES")]
209    pub feature_features: Option<String>,
210
211    /// RUSTFLAGS to use for both baseline and feature builds
212    ///
213    /// Example: `--rustflags "-C target-cpu=native"`
214    #[arg(long, value_name = "FLAGS", default_value = "-C target-cpu=native")]
215    pub rustflags: String,
216
217    /// RUSTFLAGS to use only for baseline build (overrides --rustflags)
218    ///
219    /// Example: `--baseline-rustflags "-C target-cpu=native -C lto"`
220    #[arg(long, value_name = "FLAGS")]
221    pub baseline_rustflags: Option<String>,
222
223    /// RUSTFLAGS to use only for feature build (overrides --rustflags)
224    ///
225    /// Example: `--feature-rustflags "-C target-cpu=native -C lto"`
226    #[arg(long, value_name = "FLAGS")]
227    pub feature_rustflags: Option<String>,
228
229    /// Disable automatic --debug.startup-sync-state-idle flag for specific runs.
230    /// Can be "baseline", "feature", or "all".
231    /// By default, the flag is passed to warmup, baseline, and feature runs.
232    /// When "baseline" is specified, the flag is NOT passed to warmup OR baseline.
233    /// When "feature" is specified, the flag is NOT passed to feature.
234    /// When "all" is specified, the flag is NOT passed to any run.
235    #[arg(long, value_name = "TARGET")]
236    pub disable_startup_sync_state_idle: Option<DisableStartupSyncStateIdle>,
237}
238
239impl Args {
240    /// Initializes tracing with the configured options.
241    pub(crate) fn init_tracing(&self) -> Result<Option<FileWorkerGuard>> {
242        let guard = self.logs.init_tracing()?;
243        Ok(guard)
244    }
245
246    /// Build additional arguments for a specific ref type, conditionally including
247    /// --debug.startup-sync-state-idle based on the configuration
248    pub(crate) fn build_additional_args(
249        &self,
250        ref_type: &str,
251        base_args_str: Option<&String>,
252    ) -> Vec<String> {
253        // Parse the base arguments string if provided
254        let mut args = base_args_str.map(|s| parse_args_string(s)).unwrap_or_default();
255
256        // Determine if we should add the --debug.startup-sync-state-idle flag
257        let should_add_flag = match self.disable_startup_sync_state_idle {
258            None => true, // By default, add the flag
259            Some(DisableStartupSyncStateIdle::All) => false,
260            Some(DisableStartupSyncStateIdle::Baseline) => {
261                ref_type != "baseline" && ref_type != "warmup"
262            }
263            Some(DisableStartupSyncStateIdle::Feature) => ref_type != "feature",
264        };
265
266        if should_add_flag {
267            args.push("--debug.startup-sync-state-idle".to_string());
268            debug!("Adding --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
269        } else {
270            debug!("Skipping --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
271        }
272
273        args
274    }
275
276    /// Get the default RPC URL for a given chain
277    const fn get_default_rpc_url(chain: &Chain) -> &'static str {
278        match chain.id() {
279            27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
280            _ => "https://ethereum.reth.rs/rpc",         // mainnet and fallback
281        }
282    }
283
284    /// Get the RPC URL, using chain-specific default if not provided
285    pub(crate) fn get_rpc_url(&self) -> String {
286        self.rpc_url.clone().unwrap_or_else(|| Self::get_default_rpc_url(&self.chain).to_string())
287    }
288
289    /// Get the JWT secret path - either provided or derived from datadir
290    pub(crate) fn jwt_secret_path(&self) -> PathBuf {
291        match &self.jwt_secret {
292            Some(path) => path.clone(),
293            None => {
294                // Use the same logic as reth: <datadir>/<chain>/jwt.hex
295                let chain_path = self.datadir.clone().resolve_datadir(self.chain);
296                chain_path.jwt()
297            }
298        }
299    }
300
301    /// Get the resolved datadir path using the chain
302    pub(crate) fn datadir_path(&self) -> PathBuf {
303        let chain_path = self.datadir.clone().resolve_datadir(self.chain);
304        chain_path.data_dir().to_path_buf()
305    }
306
307    /// Get the output directory path
308    pub(crate) fn output_dir_path(&self) -> PathBuf {
309        PathBuf::from(&self.output_dir)
310    }
311
312    /// Get the effective warmup blocks value - either specified or defaults to blocks
313    pub(crate) fn get_warmup_blocks(&self) -> u64 {
314        self.warmup_blocks.unwrap_or(self.blocks)
315    }
316}
317
318/// Validate that the RPC endpoint chain ID matches the specified chain
319async fn validate_rpc_chain_id(rpc_url: &str, expected_chain: &Chain) -> Result<()> {
320    // Create Alloy provider
321    let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
322    let provider = ProviderBuilder::new().connect_http(url);
323
324    // Query chain ID using Alloy
325    let rpc_chain_id = provider
326        .get_chain_id()
327        .await
328        .map_err(|e| eyre!("Failed to get chain ID from RPC endpoint {}: {:?}", rpc_url, e))?;
329
330    let expected_chain_id = expected_chain.id();
331
332    if rpc_chain_id != expected_chain_id {
333        return Err(eyre!(
334            "RPC endpoint chain ID mismatch!\n\
335            Expected: {} (chain: {})\n\
336            Found: {} at RPC endpoint: {}\n\n\
337            Please use an RPC endpoint for the correct network or change the --chain argument.",
338            expected_chain_id,
339            expected_chain,
340            rpc_chain_id,
341            rpc_url
342        ));
343    }
344
345    info!("Validated RPC endpoint chain ID");
346    Ok(())
347}
348
349/// Main comparison workflow execution
350pub(crate) async fn run_comparison(args: Args, _ctx: CliContext) -> Result<()> {
351    // Create a new process group for this process and all its children
352    #[cfg(unix)]
353    {
354        use nix::unistd::{getpid, setpgid};
355        if let Err(e) = setpgid(getpid(), getpid()) {
356            warn!("Failed to create process group: {e}");
357        }
358    }
359
360    info!(
361        "Starting benchmark comparison between '{}' and '{}'",
362        args.baseline_ref, args.feature_ref
363    );
364
365    if args.sudo {
366        info!("Running in sudo mode - reth commands will use elevated privileges");
367    }
368
369    // Initialize Git manager
370    let git_manager = GitManager::new()?;
371    // Fetch all branches, tags, and commits
372    git_manager.fetch_all()?;
373
374    // Initialize compilation manager
375    let output_dir = args.output_dir_path();
376    let compilation_manager = CompilationManager::new(
377        git_manager.repo_root().to_string(),
378        output_dir.clone(),
379        git_manager.clone(),
380    )?;
381    // Initialize node manager
382    let mut node_manager = NodeManager::new(&args);
383
384    let benchmark_runner = BenchmarkRunner::new(&args);
385    let mut comparison_generator = ComparisonGenerator::new(&args);
386
387    // Set the comparison directory in node manager to align with results directory
388    node_manager.set_comparison_dir(comparison_generator.get_output_dir());
389
390    // Store original git state for restoration
391    let original_ref = git_manager.get_current_ref()?;
392    info!("Current git reference: {}", original_ref);
393
394    // Validate git state
395    if !args.skip_git_validation {
396        git_manager.validate_clean_state()?;
397        git_manager.validate_refs(&[&args.baseline_ref, &args.feature_ref])?;
398    }
399
400    // Validate RPC endpoint chain ID matches the specified chain
401    let rpc_url = args.get_rpc_url();
402    validate_rpc_chain_id(&rpc_url, &args.chain).await?;
403
404    // Setup signal handling for cleanup
405    let git_manager_cleanup = git_manager.clone();
406    let original_ref_cleanup = original_ref.clone();
407    ctrlc::set_handler(move || {
408        eprintln!("Received interrupt signal, cleaning up...");
409
410        // Send SIGTERM to entire process group to ensure all children exit
411        #[cfg(unix)]
412        {
413            use nix::{
414                sys::signal::{kill, Signal},
415                unistd::Pid,
416            };
417
418            // Send SIGTERM to our process group (negative PID = process group)
419            let current_pid = std::process::id() as i32;
420            let pgid = Pid::from_raw(-current_pid);
421            if let Err(e) = kill(pgid, Signal::SIGTERM) {
422                eprintln!("Failed to send SIGTERM to process group: {e}");
423            }
424        }
425
426        // Give a moment for any ongoing git operations to complete
427        std::thread::sleep(std::time::Duration::from_millis(200));
428
429        if let Err(e) = git_manager_cleanup.switch_ref(&original_ref_cleanup) {
430            eprintln!("Failed to restore original git reference: {e}");
431            eprintln!("You may need to manually run: git checkout {original_ref_cleanup}");
432        }
433        std::process::exit(1);
434    })?;
435
436    let result = run_benchmark_workflow(
437        &git_manager,
438        &compilation_manager,
439        &mut node_manager,
440        &benchmark_runner,
441        &mut comparison_generator,
442        &args,
443    )
444    .await;
445
446    // Always restore original git reference
447    info!("Restoring original git reference: {}", original_ref);
448    git_manager.switch_ref(&original_ref)?;
449
450    // Handle any errors from the workflow
451    result?;
452
453    Ok(())
454}
455
456/// Parse a string of arguments into a vector of strings
457fn parse_args_string(args_str: &str) -> Vec<String> {
458    shlex::split(args_str).unwrap_or_else(|| {
459        // Fallback to simple whitespace splitting if shlex fails
460        args_str.split_whitespace().map(|s| s.to_string()).collect()
461    })
462}
463
464/// Run compilation phase for both baseline and feature binaries
465async fn run_compilation_phase(
466    git_manager: &GitManager,
467    compilation_manager: &CompilationManager,
468    args: &Args,
469) -> Result<(String, String)> {
470    info!("=== Running compilation phase ===");
471
472    // Ensure required tools are available (only need to check once)
473    compilation_manager.ensure_reth_bench_available()?;
474    if args.profile {
475        compilation_manager.ensure_samply_available()?;
476    }
477
478    let refs = [&args.baseline_ref, &args.feature_ref];
479    let ref_types = ["baseline", "feature"];
480
481    // First, resolve all refs to commits using a HashMap to avoid race conditions where a ref is
482    // pushed to mid-run.
483    let mut ref_commits = std::collections::HashMap::new();
484    for &git_ref in &refs {
485        if !ref_commits.contains_key(git_ref) {
486            git_manager.switch_ref(git_ref)?;
487            let commit = git_manager.get_current_commit()?;
488            ref_commits.insert(git_ref.clone(), commit);
489            info!("Reference {} resolves to commit: {}", git_ref, &ref_commits[git_ref][..8]);
490        }
491    }
492
493    // Now compile each ref using the resolved commits
494    for (i, &git_ref) in refs.iter().enumerate() {
495        let ref_type = ref_types[i];
496        let commit = &ref_commits[git_ref];
497
498        // Get per-build features and rustflags
499        let features = match ref_type {
500            "baseline" => args.baseline_features.as_ref().unwrap_or(&args.features),
501            "feature" => args.feature_features.as_ref().unwrap_or(&args.features),
502            _ => &args.features,
503        };
504        let rustflags = match ref_type {
505            "baseline" => args.baseline_rustflags.as_ref().unwrap_or(&args.rustflags),
506            "feature" => args.feature_rustflags.as_ref().unwrap_or(&args.rustflags),
507            _ => &args.rustflags,
508        };
509
510        info!(
511            "Compiling {} binary for reference: {} (commit: {})",
512            ref_type,
513            git_ref,
514            &commit[..8]
515        );
516
517        // Switch to target reference
518        git_manager.switch_ref(git_ref)?;
519
520        // Compile reth (with caching)
521        compilation_manager.compile_reth(commit, features, rustflags)?;
522
523        info!("Completed compilation for {} reference", ref_type);
524    }
525
526    let baseline_commit = ref_commits[&args.baseline_ref].clone();
527    let feature_commit = ref_commits[&args.feature_ref].clone();
528
529    info!("Compilation phase completed");
530    Ok((baseline_commit, feature_commit))
531}
532
533#[allow(clippy::too_many_arguments)]
534/// Run warmup phase to warm up caches before benchmarking
535async fn run_warmup_phase(
536    git_manager: &GitManager,
537    compilation_manager: &CompilationManager,
538    node_manager: &mut NodeManager,
539    benchmark_runner: &BenchmarkRunner,
540    args: &Args,
541    baseline_commit: &str,
542    starting_tip: u64,
543) -> Result<()> {
544    info!("=== Running warmup phase ===");
545
546    // Unwind to starting block minus warmup blocks, so we end up back at starting_tip
547    let warmup_blocks = args.get_warmup_blocks();
548    let unwind_target = starting_tip.saturating_sub(warmup_blocks);
549    node_manager.unwind_to_block(unwind_target).await?;
550
551    // Use baseline for warmup
552    let warmup_ref = &args.baseline_ref;
553
554    // Switch to baseline reference
555    git_manager.switch_ref(warmup_ref)?;
556
557    // Get the cached binary path for baseline (should already be compiled)
558    let binary_path = compilation_manager.get_cached_binary_path_for_commit(baseline_commit);
559
560    // Verify the cached binary exists
561    if !binary_path.exists() {
562        return Err(eyre!(
563            "Cached baseline binary not found at {:?}. Compilation phase should have created it.",
564            binary_path
565        ));
566    }
567
568    info!("Using cached baseline binary for warmup (commit: {})", &baseline_commit[..8]);
569
570    // Build additional args with conditional --debug.startup-sync-state-idle flag
571    let additional_args = args.build_additional_args("warmup", args.baseline_args.as_ref());
572
573    // Start reth node for warmup (command is not stored for warmup phase)
574    let (mut node_process, _warmup_command) =
575        node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
576
577    // Wait for node to be ready and get its current tip
578    let current_tip = if args.skip_wait_syncing {
579        node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
580    } else {
581        node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
582    };
583    info!("Warmup node is ready at tip: {}", current_tip);
584
585    // Clear filesystem caches before warmup run only (unless disabled)
586    if args.no_clear_cache {
587        info!("Skipping filesystem cache clearing (--no-clear-cache flag set)");
588    } else {
589        BenchmarkRunner::clear_fs_caches().await?;
590    }
591
592    // Run warmup to warm up caches
593    benchmark_runner.run_warmup(current_tip).await?;
594
595    // Stop node after warmup
596    node_manager.stop_node(&mut node_process).await?;
597
598    info!("Warmup phase completed");
599    Ok(())
600}
601
602/// Execute the complete benchmark workflow for both branches
603async fn run_benchmark_workflow(
604    git_manager: &GitManager,
605    compilation_manager: &CompilationManager,
606    node_manager: &mut NodeManager,
607    benchmark_runner: &BenchmarkRunner,
608    comparison_generator: &mut ComparisonGenerator,
609    args: &Args,
610) -> Result<()> {
611    // Run compilation phase for both binaries
612    let (baseline_commit, feature_commit) =
613        run_compilation_phase(git_manager, compilation_manager, args).await?;
614
615    // Switch to baseline reference and get the starting tip
616    git_manager.switch_ref(&args.baseline_ref)?;
617    let binary_path = compilation_manager.get_cached_binary_path_for_commit(&baseline_commit);
618    if !binary_path.exists() {
619        return Err(eyre!(
620            "Cached baseline binary not found at {:?}. Compilation phase should have created it.",
621            binary_path
622        ));
623    }
624
625    // Start node briefly to get the current tip, then stop it
626    info!("=== Determining initial block height ===");
627    let additional_args = args.build_additional_args("baseline", args.baseline_args.as_ref());
628    let (mut node_process, _) = node_manager
629        .start_node(&binary_path, &args.baseline_ref, "baseline", &additional_args)
630        .await?;
631    let starting_tip = if args.skip_wait_syncing {
632        node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
633    } else {
634        node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
635    };
636    info!("Node starting tip: {}", starting_tip);
637    node_manager.stop_node(&mut node_process).await?;
638
639    // Run warmup phase before benchmarking (skip if warmup_blocks is 0)
640    if args.get_warmup_blocks() > 0 {
641        run_warmup_phase(
642            git_manager,
643            compilation_manager,
644            node_manager,
645            benchmark_runner,
646            args,
647            &baseline_commit,
648            starting_tip,
649        )
650        .await?;
651    } else {
652        info!("Skipping warmup phase (warmup_blocks is 0)");
653    }
654
655    let refs = [&args.baseline_ref, &args.feature_ref];
656    let ref_types = ["baseline", "feature"];
657    let commits = [&baseline_commit, &feature_commit];
658
659    for (i, &git_ref) in refs.iter().enumerate() {
660        let ref_type = ref_types[i];
661        let commit = commits[i];
662        info!("=== Processing {} reference: {} ===", ref_type, git_ref);
663
664        // Unwind to starting block minus benchmark blocks, so we end up back at starting_tip
665        let unwind_target = starting_tip.saturating_sub(args.blocks);
666        node_manager.unwind_to_block(unwind_target).await?;
667
668        // Switch to target reference
669        git_manager.switch_ref(git_ref)?;
670
671        // Get the cached binary path for this git reference (should already be compiled)
672        let binary_path = compilation_manager.get_cached_binary_path_for_commit(commit);
673
674        // Verify the cached binary exists
675        if !binary_path.exists() {
676            return Err(eyre!(
677                "Cached {} binary not found at {:?}. Compilation phase should have created it.",
678                ref_type,
679                binary_path
680            ));
681        }
682
683        info!("Using cached {} binary (commit: {})", ref_type, &commit[..8]);
684
685        // Get reference-specific base arguments string
686        let base_args_str = match ref_type {
687            "baseline" => args.baseline_args.as_ref(),
688            "feature" => args.feature_args.as_ref(),
689            _ => None,
690        };
691
692        // Build additional args with conditional --debug.startup-sync-state-idle flag
693        let additional_args = args.build_additional_args(ref_type, base_args_str);
694
695        // Start reth node and capture the command for reporting
696        let (mut node_process, reth_command) =
697            node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
698
699        // Wait for node to be ready and get its current tip (wherever it is)
700        let current_tip = if args.skip_wait_syncing {
701            node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
702        } else {
703            node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
704        };
705        info!("Node is ready at tip: {}", current_tip);
706
707        // Calculate benchmark range
708        // Note: reth-bench has an off-by-one error where it consumes the first block
709        // of the range, so we add 1 to compensate and get exactly args.blocks blocks
710        let from_block = current_tip;
711        let to_block = current_tip + args.blocks;
712
713        // Run benchmark
714        let output_dir = comparison_generator.get_ref_output_dir(ref_type);
715
716        // Capture start timestamp for the benchmark run
717        let benchmark_start = chrono::Utc::now();
718
719        // Run benchmark (comparison logic is handled separately by ComparisonGenerator)
720        benchmark_runner.run_benchmark(from_block, to_block, &output_dir).await?;
721
722        // Capture end timestamp for the benchmark run
723        let benchmark_end = chrono::Utc::now();
724
725        // Stop node
726        node_manager.stop_node(&mut node_process).await?;
727
728        // Store results for comparison
729        comparison_generator.add_ref_results(ref_type, &output_dir)?;
730
731        // Set the benchmark run timestamps and reth command
732        comparison_generator.set_ref_timestamps(ref_type, benchmark_start, benchmark_end)?;
733        comparison_generator.set_ref_command(ref_type, reth_command)?;
734
735        info!("Completed {} reference benchmark", ref_type);
736    }
737
738    // Generate comparison report
739    comparison_generator.generate_comparison_report().await?;
740
741    // Generate charts if requested
742    if args.draw {
743        generate_comparison_charts(comparison_generator).await?;
744    }
745
746    // Start samply servers if profiling was enabled
747    if args.profile {
748        start_samply_servers(args).await?;
749    }
750
751    Ok(())
752}
753
754/// Generate comparison charts using the Python script
755async fn generate_comparison_charts(comparison_generator: &ComparisonGenerator) -> Result<()> {
756    info!("Generating comparison charts with Python script...");
757
758    let baseline_output_dir = comparison_generator.get_ref_output_dir("baseline");
759    let feature_output_dir = comparison_generator.get_ref_output_dir("feature");
760
761    let baseline_csv = baseline_output_dir.join("combined_latency.csv");
762    let feature_csv = feature_output_dir.join("combined_latency.csv");
763
764    // Check if CSV files exist
765    if !baseline_csv.exists() {
766        return Err(eyre!("Baseline CSV not found: {:?}", baseline_csv));
767    }
768    if !feature_csv.exists() {
769        return Err(eyre!("Feature CSV not found: {:?}", feature_csv));
770    }
771
772    let output_dir = comparison_generator.get_output_dir();
773    let chart_output = output_dir.join("latency_comparison.png");
774
775    let script_path = "bin/reth-bench/scripts/compare_newpayload_latency.py";
776
777    info!("Running Python comparison script with uv...");
778    let mut cmd = Command::new("uv");
779    cmd.args([
780        "run",
781        script_path,
782        &baseline_csv.to_string_lossy(),
783        &feature_csv.to_string_lossy(),
784        "-o",
785        &chart_output.to_string_lossy(),
786    ]);
787
788    // Set process group for consistent signal handling
789    #[cfg(unix)]
790    {
791        cmd.process_group(0);
792    }
793
794    let output = cmd.output().await.map_err(|e| {
795        eyre!("Failed to execute Python script with uv: {}. Make sure uv is installed.", e)
796    })?;
797
798    if !output.status.success() {
799        let stderr = String::from_utf8_lossy(&output.stderr);
800        let stdout = String::from_utf8_lossy(&output.stdout);
801        return Err(eyre!(
802            "Python script failed with exit code {:?}:\nstdout: {}\nstderr: {}",
803            output.status.code(),
804            stdout,
805            stderr
806        ));
807    }
808
809    let stdout = String::from_utf8_lossy(&output.stdout);
810    if !stdout.trim().is_empty() {
811        info!("Python script output:\n{}", stdout);
812    }
813
814    info!("Comparison chart generated: {:?}", chart_output);
815    Ok(())
816}
817
818/// Start samply servers for viewing profiles
819async fn start_samply_servers(args: &Args) -> Result<()> {
820    info!("Starting samply servers for profile viewing...");
821
822    let output_dir = args.output_dir_path();
823    let profiles_dir = output_dir.join("profiles");
824
825    // Build profile paths
826    let baseline_profile = profiles_dir.join("baseline.json.gz");
827    let feature_profile = profiles_dir.join("feature.json.gz");
828
829    // Check if profiles exist
830    if !baseline_profile.exists() {
831        warn!("Baseline profile not found: {:?}", baseline_profile);
832        return Ok(());
833    }
834    if !feature_profile.exists() {
835        warn!("Feature profile not found: {:?}", feature_profile);
836        return Ok(());
837    }
838
839    // Find two consecutive available ports starting from 3000
840    let (baseline_port, feature_port) = find_consecutive_ports(3000)?;
841    info!("Found available ports: {} and {}", baseline_port, feature_port);
842
843    // Get samply path
844    let samply_path = get_samply_path().await?;
845
846    // Start baseline server
847    info!("Starting samply server for baseline '{}' on port {}", args.baseline_ref, baseline_port);
848    let mut baseline_cmd = Command::new(&samply_path);
849    baseline_cmd
850        .args(["load", "--port", &baseline_port.to_string(), &baseline_profile.to_string_lossy()])
851        .kill_on_drop(true);
852
853    // Set process group for consistent signal handling
854    #[cfg(unix)]
855    {
856        baseline_cmd.process_group(0);
857    }
858
859    // Conditionally pipe output based on log level
860    if tracing::enabled!(tracing::Level::DEBUG) {
861        baseline_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
862    } else {
863        baseline_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
864    }
865
866    // Debug log the command
867    debug!("Executing samply load command: {:?}", baseline_cmd);
868
869    let mut baseline_child =
870        baseline_cmd.spawn().wrap_err("Failed to start samply server for baseline")?;
871
872    // Stream baseline samply output if debug logging is enabled
873    if tracing::enabled!(tracing::Level::DEBUG) {
874        if let Some(stdout) = baseline_child.stdout.take() {
875            tokio::spawn(async move {
876                use tokio::io::{AsyncBufReadExt, BufReader};
877                let reader = BufReader::new(stdout);
878                let mut lines = reader.lines();
879                while let Ok(Some(line)) = lines.next_line().await {
880                    debug!("[SAMPLY-BASELINE] {}", line);
881                }
882            });
883        }
884
885        if let Some(stderr) = baseline_child.stderr.take() {
886            tokio::spawn(async move {
887                use tokio::io::{AsyncBufReadExt, BufReader};
888                let reader = BufReader::new(stderr);
889                let mut lines = reader.lines();
890                while let Ok(Some(line)) = lines.next_line().await {
891                    debug!("[SAMPLY-BASELINE] {}", line);
892                }
893            });
894        }
895    }
896
897    // Start feature server
898    info!("Starting samply server for feature '{}' on port {}", args.feature_ref, feature_port);
899    let mut feature_cmd = Command::new(&samply_path);
900    feature_cmd
901        .args(["load", "--port", &feature_port.to_string(), &feature_profile.to_string_lossy()])
902        .kill_on_drop(true);
903
904    // Set process group for consistent signal handling
905    #[cfg(unix)]
906    {
907        feature_cmd.process_group(0);
908    }
909
910    // Conditionally pipe output based on log level
911    if tracing::enabled!(tracing::Level::DEBUG) {
912        feature_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
913    } else {
914        feature_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
915    }
916
917    // Debug log the command
918    debug!("Executing samply load command: {:?}", feature_cmd);
919
920    let mut feature_child =
921        feature_cmd.spawn().wrap_err("Failed to start samply server for feature")?;
922
923    // Stream feature samply output if debug logging is enabled
924    if tracing::enabled!(tracing::Level::DEBUG) {
925        if let Some(stdout) = feature_child.stdout.take() {
926            tokio::spawn(async move {
927                use tokio::io::{AsyncBufReadExt, BufReader};
928                let reader = BufReader::new(stdout);
929                let mut lines = reader.lines();
930                while let Ok(Some(line)) = lines.next_line().await {
931                    debug!("[SAMPLY-FEATURE] {}", line);
932                }
933            });
934        }
935
936        if let Some(stderr) = feature_child.stderr.take() {
937            tokio::spawn(async move {
938                use tokio::io::{AsyncBufReadExt, BufReader};
939                let reader = BufReader::new(stderr);
940                let mut lines = reader.lines();
941                while let Ok(Some(line)) = lines.next_line().await {
942                    debug!("[SAMPLY-FEATURE] {}", line);
943                }
944            });
945        }
946    }
947
948    // Give servers time to start
949    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
950
951    // Print access information
952    println!("\n=== SAMPLY PROFILE SERVERS STARTED ===");
953    println!("Baseline '{}': http://127.0.0.1:{}", args.baseline_ref, baseline_port);
954    println!("Feature  '{}': http://127.0.0.1:{}", args.feature_ref, feature_port);
955    println!("\nOpen the URLs in your browser to view the profiles.");
956    println!("Press Ctrl+C to stop the servers and exit.");
957    println!("=========================================\n");
958
959    // Wait for Ctrl+C or process termination
960    let ctrl_c = tokio::signal::ctrl_c();
961    let baseline_wait = baseline_child.wait();
962    let feature_wait = feature_child.wait();
963
964    tokio::select! {
965        _ = ctrl_c => {
966            info!("Received Ctrl+C, shutting down samply servers...");
967        }
968        result = baseline_wait => {
969            match result {
970                Ok(status) => info!("Baseline samply server exited with status: {}", status),
971                Err(e) => warn!("Baseline samply server error: {}", e),
972            }
973        }
974        result = feature_wait => {
975            match result {
976                Ok(status) => info!("Feature samply server exited with status: {}", status),
977                Err(e) => warn!("Feature samply server error: {}", e),
978            }
979        }
980    }
981
982    // Ensure both processes are terminated
983    let _ = baseline_child.kill().await;
984    let _ = feature_child.kill().await;
985
986    info!("Samply servers stopped.");
987    Ok(())
988}
989
990/// Find two consecutive available ports starting from the given port
991fn find_consecutive_ports(start_port: u16) -> Result<(u16, u16)> {
992    for port in start_port..=65533 {
993        // Check if both port and port+1 are available
994        if is_port_available(port) && is_port_available(port + 1) {
995            return Ok((port, port + 1));
996        }
997    }
998    Err(eyre!("Could not find two consecutive available ports starting from {}", start_port))
999}
1000
1001/// Check if a port is available by attempting to bind to it
1002fn is_port_available(port: u16) -> bool {
1003    TcpListener::bind(("127.0.0.1", port)).is_ok()
1004}
1005
1006/// Get the absolute path to samply using 'which' command
1007async fn get_samply_path() -> Result<String> {
1008    let output = Command::new("which")
1009        .arg("samply")
1010        .output()
1011        .await
1012        .wrap_err("Failed to execute 'which samply' command")?;
1013
1014    if !output.status.success() {
1015        return Err(eyre!("samply not found in PATH"));
1016    }
1017
1018    let samply_path = String::from_utf8(output.stdout)
1019        .wrap_err("samply path is not valid UTF-8")?
1020        .trim()
1021        .to_string();
1022
1023    if samply_path.is_empty() {
1024        return Err(eyre!("which samply returned empty path"));
1025    }
1026
1027    Ok(samply_path)
1028}