reth_bench_compare/
cli.rs

1//! CLI argument parsing and main command orchestration.
2
3use alloy_provider::{Provider, ProviderBuilder};
4use clap::Parser;
5use eyre::{eyre, Result, WrapErr};
6use reth_chainspec::Chain;
7use reth_cli_runner::CliContext;
8use reth_node_core::args::{DatadirArgs, LogArgs, TraceArgs};
9use reth_tracing::FileWorkerGuard;
10use std::{net::TcpListener, path::PathBuf, str::FromStr};
11use tokio::process::Command;
12use tracing::{debug, info, warn};
13
14use crate::{
15    benchmark::BenchmarkRunner, comparison::ComparisonGenerator, compilation::CompilationManager,
16    git::GitManager, node::NodeManager,
17};
18
19/// Target for disabling the --debug.startup-sync-state-idle flag
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub(crate) enum DisableStartupSyncStateIdle {
22    /// Disable for baseline and warmup runs
23    Baseline,
24    /// Disable for feature runs only
25    Feature,
26    /// Disable for all runs
27    All,
28}
29
30impl FromStr for DisableStartupSyncStateIdle {
31    type Err = String;
32
33    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
34        match s.to_lowercase().as_str() {
35            "baseline" => Ok(Self::Baseline),
36            "feature" => Ok(Self::Feature),
37            "all" => Ok(Self::All),
38            _ => Err(format!("Invalid value '{}'. Expected 'baseline', 'feature', or 'all'", s)),
39        }
40    }
41}
42
43impl std::fmt::Display for DisableStartupSyncStateIdle {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Baseline => write!(f, "baseline"),
47            Self::Feature => write!(f, "feature"),
48            Self::All => write!(f, "all"),
49        }
50    }
51}
52
53/// Automated reth benchmark comparison between git references
54#[derive(Debug, Parser)]
55#[command(
56    name = "reth-bench-compare",
57    about = "Compare reth performance between two git references (branches or tags)",
58    version
59)]
60pub(crate) struct Args {
61    /// Git reference (branch or tag) to use as baseline for comparison
62    #[arg(long, value_name = "REF")]
63    pub baseline_ref: String,
64
65    /// Git reference (branch or tag) to compare against the baseline
66    #[arg(long, value_name = "REF")]
67    pub feature_ref: String,
68
69    #[command(flatten)]
70    pub datadir: DatadirArgs,
71
72    /// Number of blocks to benchmark
73    #[arg(long, value_name = "N", default_value = "100")]
74    pub blocks: u64,
75
76    /// RPC endpoint for fetching block data
77    #[arg(long, value_name = "URL")]
78    pub rpc_url: Option<String>,
79
80    /// JWT secret file path
81    ///
82    /// If not provided, defaults to `<datadir>/<chain>/jwt.hex`.
83    /// If the file doesn't exist, it will be created automatically.
84    #[arg(long, value_name = "PATH")]
85    pub jwt_secret: Option<PathBuf>,
86
87    /// Output directory for benchmark results
88    #[arg(long, value_name = "PATH", default_value = "./reth-bench-compare")]
89    pub output_dir: String,
90
91    /// Skip git branch validation (useful for testing)
92    #[arg(long)]
93    pub skip_git_validation: bool,
94
95    /// Port for reth metrics endpoint
96    #[arg(long, value_name = "PORT", default_value = "5005")]
97    pub metrics_port: u16,
98
99    /// The chain this node is running.
100    ///
101    /// Possible values are either a built-in chain name or numeric chain ID.
102    #[arg(long, value_name = "CHAIN", default_value = "mainnet", required = false)]
103    pub chain: Chain,
104
105    /// Run reth binary with sudo (for elevated privileges)
106    #[arg(long)]
107    pub sudo: bool,
108
109    /// Generate comparison charts using Python script
110    #[arg(long)]
111    pub draw: bool,
112
113    /// Enable CPU profiling with samply during benchmark runs
114    #[arg(long)]
115    pub profile: bool,
116
117    /// Optional fixed delay between engine API calls (passed to reth-bench).
118    ///
119    /// When set, reth-bench uses wait-time mode and disables persistence-based flow.
120    /// This flag remains for compatibility with older scripts.
121    #[arg(long, value_name = "DURATION", hide = true)]
122    pub wait_time: Option<String>,
123
124    /// Wait for blocks to be persisted before sending the next batch (passed to reth-bench).
125    ///
126    /// When enabled, waits for every Nth block to be persisted using the
127    /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
128    /// doesn't outpace persistence.
129    #[arg(long)]
130    pub wait_for_persistence: bool,
131
132    /// Engine persistence threshold (passed to reth-bench).
133    ///
134    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
135    /// matches the engine's default persistence threshold (2), so waits occur
136    /// at blocks 3, 6, 9, etc.
137    #[arg(long, value_name = "PERSISTENCE_THRESHOLD")]
138    pub persistence_threshold: Option<u64>,
139
140    /// Number of blocks to run for cache warmup after clearing caches.
141    /// If not specified, defaults to the same as --blocks
142    #[arg(long, value_name = "N")]
143    pub warmup_blocks: Option<u64>,
144
145    /// Disable filesystem cache clearing before warmup phase.
146    /// By default, filesystem caches are cleared before warmup to ensure consistent benchmarks.
147    #[arg(long)]
148    pub no_clear_cache: bool,
149
150    /// Skip waiting for the node to sync before starting benchmarks.
151    /// When enabled, assumes the node is already synced and skips the initial tip check.
152    #[arg(long)]
153    pub skip_wait_syncing: bool,
154
155    #[command(flatten)]
156    pub logs: LogArgs,
157
158    #[command(flatten)]
159    pub traces: TraceArgs,
160
161    /// Maximum queue size for OTLP Batch Span Processor (traces).
162    /// Higher values prevent trace drops when benchmarking many blocks.
163    #[arg(
164        long,
165        value_name = "OTLP_BUFFER_SIZE",
166        default_value = "32768",
167        help_heading = "Tracing"
168    )]
169    pub otlp_max_queue_size: usize,
170
171    /// Additional arguments to pass to baseline reth node command
172    ///
173    /// Example: `--baseline-args "--debug.tip 0xabc..."`
174    #[arg(long, value_name = "ARGS")]
175    pub baseline_args: Option<String>,
176
177    /// Additional arguments to pass to feature reth node command
178    ///
179    /// Example: `--feature-args "--debug.tip 0xdef..."`
180    #[arg(long, value_name = "ARGS")]
181    pub feature_args: Option<String>,
182
183    /// Additional arguments to pass to reth node command (applied to both baseline and feature)
184    ///
185    /// All arguments after `--` will be passed directly to the reth node command.
186    /// Example: `reth-bench-compare --baseline-ref main --feature-ref pr/123 -- --debug.tip
187    /// 0xabc...`
188    #[arg(trailing_var_arg = true, allow_hyphen_values = true)]
189    pub reth_args: Vec<String>,
190
191    /// Comma-separated list of features to enable during reth compilation (applied to both builds)
192    ///
193    /// Example: `jemalloc,asm-keccak`
194    #[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
195    pub features: String,
196
197    /// Comma-separated list of features to enable only for baseline build (overrides --features)
198    ///
199    /// Example: `--baseline-features jemalloc`
200    #[arg(long, value_name = "FEATURES")]
201    pub baseline_features: Option<String>,
202
203    /// Comma-separated list of features to enable only for feature build (overrides --features)
204    ///
205    /// Example: `--feature-features jemalloc,asm-keccak`
206    #[arg(long, value_name = "FEATURES")]
207    pub feature_features: Option<String>,
208
209    /// RUSTFLAGS to use for both baseline and feature builds
210    ///
211    /// Example: `--rustflags "-C target-cpu=native"`
212    #[arg(long, value_name = "FLAGS", default_value = "-C target-cpu=native")]
213    pub rustflags: String,
214
215    /// RUSTFLAGS to use only for baseline build (overrides --rustflags)
216    ///
217    /// Example: `--baseline-rustflags "-C target-cpu=native -C lto"`
218    #[arg(long, value_name = "FLAGS")]
219    pub baseline_rustflags: Option<String>,
220
221    /// RUSTFLAGS to use only for feature build (overrides --rustflags)
222    ///
223    /// Example: `--feature-rustflags "-C target-cpu=native -C lto"`
224    #[arg(long, value_name = "FLAGS")]
225    pub feature_rustflags: Option<String>,
226
227    /// Disable automatic --debug.startup-sync-state-idle flag for specific runs.
228    /// Can be "baseline", "feature", or "all".
229    /// By default, the flag is passed to warmup, baseline, and feature runs.
230    /// When "baseline" is specified, the flag is NOT passed to warmup OR baseline.
231    /// When "feature" is specified, the flag is NOT passed to feature.
232    /// When "all" is specified, the flag is NOT passed to any run.
233    #[arg(long, value_name = "TARGET")]
234    pub disable_startup_sync_state_idle: Option<DisableStartupSyncStateIdle>,
235}
236
237impl Args {
238    /// Initializes tracing with the configured options.
239    pub(crate) fn init_tracing(&self) -> Result<Option<FileWorkerGuard>> {
240        let guard = self.logs.init_tracing()?;
241        Ok(guard)
242    }
243
244    /// Build additional arguments for a specific ref type, conditionally including
245    /// --debug.startup-sync-state-idle based on the configuration
246    pub(crate) fn build_additional_args(
247        &self,
248        ref_type: &str,
249        base_args_str: Option<&String>,
250    ) -> Vec<String> {
251        // Parse the base arguments string if provided
252        let mut args = base_args_str.map(|s| parse_args_string(s)).unwrap_or_default();
253
254        // Determine if we should add the --debug.startup-sync-state-idle flag
255        let should_add_flag = match self.disable_startup_sync_state_idle {
256            None => true, // By default, add the flag
257            Some(DisableStartupSyncStateIdle::All) => false,
258            Some(DisableStartupSyncStateIdle::Baseline) => {
259                ref_type != "baseline" && ref_type != "warmup"
260            }
261            Some(DisableStartupSyncStateIdle::Feature) => ref_type != "feature",
262        };
263
264        if should_add_flag {
265            args.push("--debug.startup-sync-state-idle".to_string());
266            debug!("Adding --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
267        } else {
268            debug!("Skipping --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
269        }
270
271        args
272    }
273
274    /// Get the default RPC URL for a given chain
275    const fn get_default_rpc_url(chain: &Chain) -> &'static str {
276        match chain.id() {
277            8453 => "https://base-mainnet.rpc.ithaca.xyz",  // base
278            84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
279            27082 => "https://rpc.hoodi.ethpandaops.io",    // hoodi
280            _ => "https://reth-ethereum.ithaca.xyz/rpc",    // mainnet and fallback
281        }
282    }
283
284    /// Get the RPC URL, using chain-specific default if not provided
285    pub(crate) fn get_rpc_url(&self) -> String {
286        self.rpc_url.clone().unwrap_or_else(|| Self::get_default_rpc_url(&self.chain).to_string())
287    }
288
289    /// Get the JWT secret path - either provided or derived from datadir
290    pub(crate) fn jwt_secret_path(&self) -> PathBuf {
291        match &self.jwt_secret {
292            Some(path) => {
293                let jwt_secret_str = path.to_string_lossy();
294                let expanded = shellexpand::tilde(&jwt_secret_str);
295                PathBuf::from(expanded.as_ref())
296            }
297            None => {
298                // Use the same logic as reth: <datadir>/<chain>/jwt.hex
299                let chain_path = self.datadir.clone().resolve_datadir(self.chain);
300                chain_path.jwt()
301            }
302        }
303    }
304
305    /// Get the resolved datadir path using the chain
306    pub(crate) fn datadir_path(&self) -> PathBuf {
307        let chain_path = self.datadir.clone().resolve_datadir(self.chain);
308        chain_path.data_dir().to_path_buf()
309    }
310
311    /// Get the expanded output directory path
312    pub(crate) fn output_dir_path(&self) -> PathBuf {
313        let expanded = shellexpand::tilde(&self.output_dir);
314        PathBuf::from(expanded.as_ref())
315    }
316
317    /// Get the effective warmup blocks value - either specified or defaults to blocks
318    pub(crate) fn get_warmup_blocks(&self) -> u64 {
319        self.warmup_blocks.unwrap_or(self.blocks)
320    }
321}
322
323/// Validate that the RPC endpoint chain ID matches the specified chain
324async fn validate_rpc_chain_id(rpc_url: &str, expected_chain: &Chain) -> Result<()> {
325    // Create Alloy provider
326    let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
327    let provider = ProviderBuilder::new().connect_http(url);
328
329    // Query chain ID using Alloy
330    let rpc_chain_id = provider
331        .get_chain_id()
332        .await
333        .map_err(|e| eyre!("Failed to get chain ID from RPC endpoint {}: {:?}", rpc_url, e))?;
334
335    let expected_chain_id = expected_chain.id();
336
337    if rpc_chain_id != expected_chain_id {
338        return Err(eyre!(
339            "RPC endpoint chain ID mismatch!\n\
340            Expected: {} (chain: {})\n\
341            Found: {} at RPC endpoint: {}\n\n\
342            Please use an RPC endpoint for the correct network or change the --chain argument.",
343            expected_chain_id,
344            expected_chain,
345            rpc_chain_id,
346            rpc_url
347        ));
348    }
349
350    info!("Validated RPC endpoint chain ID");
351    Ok(())
352}
353
354/// Main comparison workflow execution
355pub(crate) async fn run_comparison(args: Args, _ctx: CliContext) -> Result<()> {
356    // Create a new process group for this process and all its children
357    #[cfg(unix)]
358    {
359        use nix::unistd::{getpid, setpgid};
360        if let Err(e) = setpgid(getpid(), getpid()) {
361            warn!("Failed to create process group: {e}");
362        }
363    }
364
365    info!(
366        "Starting benchmark comparison between '{}' and '{}'",
367        args.baseline_ref, args.feature_ref
368    );
369
370    if args.sudo {
371        info!("Running in sudo mode - reth commands will use elevated privileges");
372    }
373
374    // Initialize Git manager
375    let git_manager = GitManager::new()?;
376    // Fetch all branches, tags, and commits
377    git_manager.fetch_all()?;
378
379    // Initialize compilation manager
380    let output_dir = args.output_dir_path();
381    let compilation_manager = CompilationManager::new(
382        git_manager.repo_root().to_string(),
383        output_dir.clone(),
384        git_manager.clone(),
385    )?;
386    // Initialize node manager
387    let mut node_manager = NodeManager::new(&args);
388
389    let benchmark_runner = BenchmarkRunner::new(&args);
390    let mut comparison_generator = ComparisonGenerator::new(&args);
391
392    // Set the comparison directory in node manager to align with results directory
393    node_manager.set_comparison_dir(comparison_generator.get_output_dir());
394
395    // Store original git state for restoration
396    let original_ref = git_manager.get_current_ref()?;
397    info!("Current git reference: {}", original_ref);
398
399    // Validate git state
400    if !args.skip_git_validation {
401        git_manager.validate_clean_state()?;
402        git_manager.validate_refs(&[&args.baseline_ref, &args.feature_ref])?;
403    }
404
405    // Validate RPC endpoint chain ID matches the specified chain
406    let rpc_url = args.get_rpc_url();
407    validate_rpc_chain_id(&rpc_url, &args.chain).await?;
408
409    // Setup signal handling for cleanup
410    let git_manager_cleanup = git_manager.clone();
411    let original_ref_cleanup = original_ref.clone();
412    ctrlc::set_handler(move || {
413        eprintln!("Received interrupt signal, cleaning up...");
414
415        // Send SIGTERM to entire process group to ensure all children exit
416        #[cfg(unix)]
417        {
418            use nix::{
419                sys::signal::{kill, Signal},
420                unistd::Pid,
421            };
422
423            // Send SIGTERM to our process group (negative PID = process group)
424            let current_pid = std::process::id() as i32;
425            let pgid = Pid::from_raw(-current_pid);
426            if let Err(e) = kill(pgid, Signal::SIGTERM) {
427                eprintln!("Failed to send SIGTERM to process group: {e}");
428            }
429        }
430
431        // Give a moment for any ongoing git operations to complete
432        std::thread::sleep(std::time::Duration::from_millis(200));
433
434        if let Err(e) = git_manager_cleanup.switch_ref(&original_ref_cleanup) {
435            eprintln!("Failed to restore original git reference: {e}");
436            eprintln!("You may need to manually run: git checkout {original_ref_cleanup}");
437        }
438        std::process::exit(1);
439    })?;
440
441    let result = run_benchmark_workflow(
442        &git_manager,
443        &compilation_manager,
444        &mut node_manager,
445        &benchmark_runner,
446        &mut comparison_generator,
447        &args,
448    )
449    .await;
450
451    // Always restore original git reference
452    info!("Restoring original git reference: {}", original_ref);
453    git_manager.switch_ref(&original_ref)?;
454
455    // Handle any errors from the workflow
456    result?;
457
458    Ok(())
459}
460
461/// Parse a string of arguments into a vector of strings
462fn parse_args_string(args_str: &str) -> Vec<String> {
463    shlex::split(args_str).unwrap_or_else(|| {
464        // Fallback to simple whitespace splitting if shlex fails
465        args_str.split_whitespace().map(|s| s.to_string()).collect()
466    })
467}
468
469/// Run compilation phase for both baseline and feature binaries
470async fn run_compilation_phase(
471    git_manager: &GitManager,
472    compilation_manager: &CompilationManager,
473    args: &Args,
474    is_optimism: bool,
475) -> Result<(String, String)> {
476    info!("=== Running compilation phase ===");
477
478    // Ensure required tools are available (only need to check once)
479    compilation_manager.ensure_reth_bench_available()?;
480    if args.profile {
481        compilation_manager.ensure_samply_available()?;
482    }
483
484    let refs = [&args.baseline_ref, &args.feature_ref];
485    let ref_types = ["baseline", "feature"];
486
487    // First, resolve all refs to commits using a HashMap to avoid race conditions where a ref is
488    // pushed to mid-run.
489    let mut ref_commits = std::collections::HashMap::new();
490    for &git_ref in &refs {
491        if !ref_commits.contains_key(git_ref) {
492            git_manager.switch_ref(git_ref)?;
493            let commit = git_manager.get_current_commit()?;
494            ref_commits.insert(git_ref.clone(), commit);
495            info!("Reference {} resolves to commit: {}", git_ref, &ref_commits[git_ref][..8]);
496        }
497    }
498
499    // Now compile each ref using the resolved commits
500    for (i, &git_ref) in refs.iter().enumerate() {
501        let ref_type = ref_types[i];
502        let commit = &ref_commits[git_ref];
503
504        // Get per-build features and rustflags
505        let features = match ref_type {
506            "baseline" => args.baseline_features.as_ref().unwrap_or(&args.features),
507            "feature" => args.feature_features.as_ref().unwrap_or(&args.features),
508            _ => &args.features,
509        };
510        let rustflags = match ref_type {
511            "baseline" => args.baseline_rustflags.as_ref().unwrap_or(&args.rustflags),
512            "feature" => args.feature_rustflags.as_ref().unwrap_or(&args.rustflags),
513            _ => &args.rustflags,
514        };
515
516        info!(
517            "Compiling {} binary for reference: {} (commit: {})",
518            ref_type,
519            git_ref,
520            &commit[..8]
521        );
522
523        // Switch to target reference
524        git_manager.switch_ref(git_ref)?;
525
526        // Compile reth (with caching)
527        compilation_manager.compile_reth(commit, is_optimism, features, rustflags)?;
528
529        info!("Completed compilation for {} reference", ref_type);
530    }
531
532    let baseline_commit = ref_commits[&args.baseline_ref].clone();
533    let feature_commit = ref_commits[&args.feature_ref].clone();
534
535    info!("Compilation phase completed");
536    Ok((baseline_commit, feature_commit))
537}
538
539#[allow(clippy::too_many_arguments)]
540/// Run warmup phase to warm up caches before benchmarking
541async fn run_warmup_phase(
542    git_manager: &GitManager,
543    compilation_manager: &CompilationManager,
544    node_manager: &mut NodeManager,
545    benchmark_runner: &BenchmarkRunner,
546    args: &Args,
547    is_optimism: bool,
548    baseline_commit: &str,
549    starting_tip: u64,
550) -> Result<()> {
551    info!("=== Running warmup phase ===");
552
553    // Unwind to starting block minus warmup blocks, so we end up back at starting_tip
554    let warmup_blocks = args.get_warmup_blocks();
555    let unwind_target = starting_tip.saturating_sub(warmup_blocks);
556    node_manager.unwind_to_block(unwind_target).await?;
557
558    // Use baseline for warmup
559    let warmup_ref = &args.baseline_ref;
560
561    // Switch to baseline reference
562    git_manager.switch_ref(warmup_ref)?;
563
564    // Get the cached binary path for baseline (should already be compiled)
565    let binary_path =
566        compilation_manager.get_cached_binary_path_for_commit(baseline_commit, is_optimism);
567
568    // Verify the cached binary exists
569    if !binary_path.exists() {
570        return Err(eyre!(
571            "Cached baseline binary not found at {:?}. Compilation phase should have created it.",
572            binary_path
573        ));
574    }
575
576    info!("Using cached baseline binary for warmup (commit: {})", &baseline_commit[..8]);
577
578    // Build additional args with conditional --debug.startup-sync-state-idle flag
579    let additional_args = args.build_additional_args("warmup", args.baseline_args.as_ref());
580
581    // Start reth node for warmup (command is not stored for warmup phase)
582    let (mut node_process, _warmup_command) =
583        node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
584
585    // Wait for node to be ready and get its current tip
586    let current_tip = if args.skip_wait_syncing {
587        node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
588    } else {
589        node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
590    };
591    info!("Warmup node is ready at tip: {}", current_tip);
592
593    // Clear filesystem caches before warmup run only (unless disabled)
594    if args.no_clear_cache {
595        info!("Skipping filesystem cache clearing (--no-clear-cache flag set)");
596    } else {
597        BenchmarkRunner::clear_fs_caches().await?;
598    }
599
600    // Run warmup to warm up caches
601    benchmark_runner.run_warmup(current_tip).await?;
602
603    // Stop node after warmup
604    node_manager.stop_node(&mut node_process).await?;
605
606    info!("Warmup phase completed");
607    Ok(())
608}
609
610/// Execute the complete benchmark workflow for both branches
611async fn run_benchmark_workflow(
612    git_manager: &GitManager,
613    compilation_manager: &CompilationManager,
614    node_manager: &mut NodeManager,
615    benchmark_runner: &BenchmarkRunner,
616    comparison_generator: &mut ComparisonGenerator,
617    args: &Args,
618) -> Result<()> {
619    // Detect if this is an Optimism chain once at the beginning
620    let rpc_url = args.get_rpc_url();
621    let is_optimism = compilation_manager.detect_optimism_chain(&rpc_url).await?;
622
623    // Run compilation phase for both binaries
624    let (baseline_commit, feature_commit) =
625        run_compilation_phase(git_manager, compilation_manager, args, is_optimism).await?;
626
627    // Switch to baseline reference and get the starting tip
628    git_manager.switch_ref(&args.baseline_ref)?;
629    let binary_path =
630        compilation_manager.get_cached_binary_path_for_commit(&baseline_commit, is_optimism);
631    if !binary_path.exists() {
632        return Err(eyre!(
633            "Cached baseline binary not found at {:?}. Compilation phase should have created it.",
634            binary_path
635        ));
636    }
637
638    // Start node briefly to get the current tip, then stop it
639    info!("=== Determining initial block height ===");
640    let additional_args = args.build_additional_args("baseline", args.baseline_args.as_ref());
641    let (mut node_process, _) = node_manager
642        .start_node(&binary_path, &args.baseline_ref, "baseline", &additional_args)
643        .await?;
644    let starting_tip = if args.skip_wait_syncing {
645        node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
646    } else {
647        node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
648    };
649    info!("Node starting tip: {}", starting_tip);
650    node_manager.stop_node(&mut node_process).await?;
651
652    // Run warmup phase before benchmarking (skip if warmup_blocks is 0)
653    if args.get_warmup_blocks() > 0 {
654        run_warmup_phase(
655            git_manager,
656            compilation_manager,
657            node_manager,
658            benchmark_runner,
659            args,
660            is_optimism,
661            &baseline_commit,
662            starting_tip,
663        )
664        .await?;
665    } else {
666        info!("Skipping warmup phase (warmup_blocks is 0)");
667    }
668
669    let refs = [&args.baseline_ref, &args.feature_ref];
670    let ref_types = ["baseline", "feature"];
671    let commits = [&baseline_commit, &feature_commit];
672
673    for (i, &git_ref) in refs.iter().enumerate() {
674        let ref_type = ref_types[i];
675        let commit = commits[i];
676        info!("=== Processing {} reference: {} ===", ref_type, git_ref);
677
678        // Unwind to starting block minus benchmark blocks, so we end up back at starting_tip
679        let unwind_target = starting_tip.saturating_sub(args.blocks);
680        node_manager.unwind_to_block(unwind_target).await?;
681
682        // Switch to target reference
683        git_manager.switch_ref(git_ref)?;
684
685        // Get the cached binary path for this git reference (should already be compiled)
686        let binary_path =
687            compilation_manager.get_cached_binary_path_for_commit(commit, is_optimism);
688
689        // Verify the cached binary exists
690        if !binary_path.exists() {
691            return Err(eyre!(
692                "Cached {} binary not found at {:?}. Compilation phase should have created it.",
693                ref_type,
694                binary_path
695            ));
696        }
697
698        info!("Using cached {} binary (commit: {})", ref_type, &commit[..8]);
699
700        // Get reference-specific base arguments string
701        let base_args_str = match ref_type {
702            "baseline" => args.baseline_args.as_ref(),
703            "feature" => args.feature_args.as_ref(),
704            _ => None,
705        };
706
707        // Build additional args with conditional --debug.startup-sync-state-idle flag
708        let additional_args = args.build_additional_args(ref_type, base_args_str);
709
710        // Start reth node and capture the command for reporting
711        let (mut node_process, reth_command) =
712            node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
713
714        // Wait for node to be ready and get its current tip (wherever it is)
715        let current_tip = if args.skip_wait_syncing {
716            node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
717        } else {
718            node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
719        };
720        info!("Node is ready at tip: {}", current_tip);
721
722        // Calculate benchmark range
723        // Note: reth-bench has an off-by-one error where it consumes the first block
724        // of the range, so we add 1 to compensate and get exactly args.blocks blocks
725        let from_block = current_tip;
726        let to_block = current_tip + args.blocks;
727
728        // Run benchmark
729        let output_dir = comparison_generator.get_ref_output_dir(ref_type);
730
731        // Capture start timestamp for the benchmark run
732        let benchmark_start = chrono::Utc::now();
733
734        // Run benchmark (comparison logic is handled separately by ComparisonGenerator)
735        benchmark_runner.run_benchmark(from_block, to_block, &output_dir).await?;
736
737        // Capture end timestamp for the benchmark run
738        let benchmark_end = chrono::Utc::now();
739
740        // Stop node
741        node_manager.stop_node(&mut node_process).await?;
742
743        // Store results for comparison
744        comparison_generator.add_ref_results(ref_type, &output_dir)?;
745
746        // Set the benchmark run timestamps and reth command
747        comparison_generator.set_ref_timestamps(ref_type, benchmark_start, benchmark_end)?;
748        comparison_generator.set_ref_command(ref_type, reth_command)?;
749
750        info!("Completed {} reference benchmark", ref_type);
751    }
752
753    // Generate comparison report
754    comparison_generator.generate_comparison_report().await?;
755
756    // Generate charts if requested
757    if args.draw {
758        generate_comparison_charts(comparison_generator).await?;
759    }
760
761    // Start samply servers if profiling was enabled
762    if args.profile {
763        start_samply_servers(args).await?;
764    }
765
766    Ok(())
767}
768
769/// Generate comparison charts using the Python script
770async fn generate_comparison_charts(comparison_generator: &ComparisonGenerator) -> Result<()> {
771    info!("Generating comparison charts with Python script...");
772
773    let baseline_output_dir = comparison_generator.get_ref_output_dir("baseline");
774    let feature_output_dir = comparison_generator.get_ref_output_dir("feature");
775
776    let baseline_csv = baseline_output_dir.join("combined_latency.csv");
777    let feature_csv = feature_output_dir.join("combined_latency.csv");
778
779    // Check if CSV files exist
780    if !baseline_csv.exists() {
781        return Err(eyre!("Baseline CSV not found: {:?}", baseline_csv));
782    }
783    if !feature_csv.exists() {
784        return Err(eyre!("Feature CSV not found: {:?}", feature_csv));
785    }
786
787    let output_dir = comparison_generator.get_output_dir();
788    let chart_output = output_dir.join("latency_comparison.png");
789
790    let script_path = "bin/reth-bench/scripts/compare_newpayload_latency.py";
791
792    info!("Running Python comparison script with uv...");
793    let mut cmd = Command::new("uv");
794    cmd.args([
795        "run",
796        script_path,
797        &baseline_csv.to_string_lossy(),
798        &feature_csv.to_string_lossy(),
799        "-o",
800        &chart_output.to_string_lossy(),
801    ]);
802
803    // Set process group for consistent signal handling
804    #[cfg(unix)]
805    {
806        cmd.process_group(0);
807    }
808
809    let output = cmd.output().await.map_err(|e| {
810        eyre!("Failed to execute Python script with uv: {}. Make sure uv is installed.", e)
811    })?;
812
813    if !output.status.success() {
814        let stderr = String::from_utf8_lossy(&output.stderr);
815        let stdout = String::from_utf8_lossy(&output.stdout);
816        return Err(eyre!(
817            "Python script failed with exit code {:?}:\nstdout: {}\nstderr: {}",
818            output.status.code(),
819            stdout,
820            stderr
821        ));
822    }
823
824    let stdout = String::from_utf8_lossy(&output.stdout);
825    if !stdout.trim().is_empty() {
826        info!("Python script output:\n{}", stdout);
827    }
828
829    info!("Comparison chart generated: {:?}", chart_output);
830    Ok(())
831}
832
833/// Start samply servers for viewing profiles
834async fn start_samply_servers(args: &Args) -> Result<()> {
835    info!("Starting samply servers for profile viewing...");
836
837    let output_dir = args.output_dir_path();
838    let profiles_dir = output_dir.join("profiles");
839
840    // Build profile paths
841    let baseline_profile = profiles_dir.join("baseline.json.gz");
842    let feature_profile = profiles_dir.join("feature.json.gz");
843
844    // Check if profiles exist
845    if !baseline_profile.exists() {
846        warn!("Baseline profile not found: {:?}", baseline_profile);
847        return Ok(());
848    }
849    if !feature_profile.exists() {
850        warn!("Feature profile not found: {:?}", feature_profile);
851        return Ok(());
852    }
853
854    // Find two consecutive available ports starting from 3000
855    let (baseline_port, feature_port) = find_consecutive_ports(3000)?;
856    info!("Found available ports: {} and {}", baseline_port, feature_port);
857
858    // Get samply path
859    let samply_path = get_samply_path().await?;
860
861    // Start baseline server
862    info!("Starting samply server for baseline '{}' on port {}", args.baseline_ref, baseline_port);
863    let mut baseline_cmd = Command::new(&samply_path);
864    baseline_cmd
865        .args(["load", "--port", &baseline_port.to_string(), &baseline_profile.to_string_lossy()])
866        .kill_on_drop(true);
867
868    // Set process group for consistent signal handling
869    #[cfg(unix)]
870    {
871        baseline_cmd.process_group(0);
872    }
873
874    // Conditionally pipe output based on log level
875    if tracing::enabled!(tracing::Level::DEBUG) {
876        baseline_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
877    } else {
878        baseline_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
879    }
880
881    // Debug log the command
882    debug!("Executing samply load command: {:?}", baseline_cmd);
883
884    let mut baseline_child =
885        baseline_cmd.spawn().wrap_err("Failed to start samply server for baseline")?;
886
887    // Stream baseline samply output if debug logging is enabled
888    if tracing::enabled!(tracing::Level::DEBUG) {
889        if let Some(stdout) = baseline_child.stdout.take() {
890            tokio::spawn(async move {
891                use tokio::io::{AsyncBufReadExt, BufReader};
892                let reader = BufReader::new(stdout);
893                let mut lines = reader.lines();
894                while let Ok(Some(line)) = lines.next_line().await {
895                    debug!("[SAMPLY-BASELINE] {}", line);
896                }
897            });
898        }
899
900        if let Some(stderr) = baseline_child.stderr.take() {
901            tokio::spawn(async move {
902                use tokio::io::{AsyncBufReadExt, BufReader};
903                let reader = BufReader::new(stderr);
904                let mut lines = reader.lines();
905                while let Ok(Some(line)) = lines.next_line().await {
906                    debug!("[SAMPLY-BASELINE] {}", line);
907                }
908            });
909        }
910    }
911
912    // Start feature server
913    info!("Starting samply server for feature '{}' on port {}", args.feature_ref, feature_port);
914    let mut feature_cmd = Command::new(&samply_path);
915    feature_cmd
916        .args(["load", "--port", &feature_port.to_string(), &feature_profile.to_string_lossy()])
917        .kill_on_drop(true);
918
919    // Set process group for consistent signal handling
920    #[cfg(unix)]
921    {
922        feature_cmd.process_group(0);
923    }
924
925    // Conditionally pipe output based on log level
926    if tracing::enabled!(tracing::Level::DEBUG) {
927        feature_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
928    } else {
929        feature_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
930    }
931
932    // Debug log the command
933    debug!("Executing samply load command: {:?}", feature_cmd);
934
935    let mut feature_child =
936        feature_cmd.spawn().wrap_err("Failed to start samply server for feature")?;
937
938    // Stream feature samply output if debug logging is enabled
939    if tracing::enabled!(tracing::Level::DEBUG) {
940        if let Some(stdout) = feature_child.stdout.take() {
941            tokio::spawn(async move {
942                use tokio::io::{AsyncBufReadExt, BufReader};
943                let reader = BufReader::new(stdout);
944                let mut lines = reader.lines();
945                while let Ok(Some(line)) = lines.next_line().await {
946                    debug!("[SAMPLY-FEATURE] {}", line);
947                }
948            });
949        }
950
951        if let Some(stderr) = feature_child.stderr.take() {
952            tokio::spawn(async move {
953                use tokio::io::{AsyncBufReadExt, BufReader};
954                let reader = BufReader::new(stderr);
955                let mut lines = reader.lines();
956                while let Ok(Some(line)) = lines.next_line().await {
957                    debug!("[SAMPLY-FEATURE] {}", line);
958                }
959            });
960        }
961    }
962
963    // Give servers time to start
964    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
965
966    // Print access information
967    println!("\n=== SAMPLY PROFILE SERVERS STARTED ===");
968    println!("Baseline '{}': http://127.0.0.1:{}", args.baseline_ref, baseline_port);
969    println!("Feature  '{}': http://127.0.0.1:{}", args.feature_ref, feature_port);
970    println!("\nOpen the URLs in your browser to view the profiles.");
971    println!("Press Ctrl+C to stop the servers and exit.");
972    println!("=========================================\n");
973
974    // Wait for Ctrl+C or process termination
975    let ctrl_c = tokio::signal::ctrl_c();
976    let baseline_wait = baseline_child.wait();
977    let feature_wait = feature_child.wait();
978
979    tokio::select! {
980        _ = ctrl_c => {
981            info!("Received Ctrl+C, shutting down samply servers...");
982        }
983        result = baseline_wait => {
984            match result {
985                Ok(status) => info!("Baseline samply server exited with status: {}", status),
986                Err(e) => warn!("Baseline samply server error: {}", e),
987            }
988        }
989        result = feature_wait => {
990            match result {
991                Ok(status) => info!("Feature samply server exited with status: {}", status),
992                Err(e) => warn!("Feature samply server error: {}", e),
993            }
994        }
995    }
996
997    // Ensure both processes are terminated
998    let _ = baseline_child.kill().await;
999    let _ = feature_child.kill().await;
1000
1001    info!("Samply servers stopped.");
1002    Ok(())
1003}
1004
1005/// Find two consecutive available ports starting from the given port
1006fn find_consecutive_ports(start_port: u16) -> Result<(u16, u16)> {
1007    for port in start_port..=65533 {
1008        // Check if both port and port+1 are available
1009        if is_port_available(port) && is_port_available(port + 1) {
1010            return Ok((port, port + 1));
1011        }
1012    }
1013    Err(eyre!("Could not find two consecutive available ports starting from {}", start_port))
1014}
1015
1016/// Check if a port is available by attempting to bind to it
1017fn is_port_available(port: u16) -> bool {
1018    TcpListener::bind(("127.0.0.1", port)).is_ok()
1019}
1020
1021/// Get the absolute path to samply using 'which' command
1022async fn get_samply_path() -> Result<String> {
1023    let output = Command::new("which")
1024        .arg("samply")
1025        .output()
1026        .await
1027        .wrap_err("Failed to execute 'which samply' command")?;
1028
1029    if !output.status.success() {
1030        return Err(eyre!("samply not found in PATH"));
1031    }
1032
1033    let samply_path = String::from_utf8(output.stdout)
1034        .wrap_err("samply path is not valid UTF-8")?
1035        .trim()
1036        .to_string();
1037
1038    if samply_path.is_empty() {
1039        return Err(eyre!("which samply returned empty path"));
1040    }
1041
1042    Ok(samply_path)
1043}