1use crate::cli::Args;
4use alloy_provider::{Provider, ProviderBuilder};
5use alloy_rpc_client::RpcClient;
6use alloy_rpc_types_eth::SyncStatus;
7use alloy_transport_ws::WsConnect;
8use eyre::{eyre, OptionExt, Result, WrapErr};
9#[cfg(unix)]
10use nix::sys::signal::{killpg, Signal};
11#[cfg(unix)]
12use nix::unistd::Pid;
13use reth_chainspec::Chain;
14use std::{fs, path::PathBuf, time::Duration};
15use tokio::{
16 fs::File as AsyncFile,
17 io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader},
18 process::Command,
19 time::{sleep, timeout},
20};
21use tracing::{debug, info, warn};
22
23const DEFAULT_WS_RPC_PORT: u16 = 8546;
25
26pub(crate) struct NodeManager {
28 datadir: Option<String>,
29 metrics_port: u16,
30 chain: Chain,
31 use_sudo: bool,
32 binary_path: Option<std::path::PathBuf>,
33 enable_profiling: bool,
34 output_dir: PathBuf,
35 additional_reth_args: Vec<String>,
36 comparison_dir: Option<PathBuf>,
37 tracing_endpoint: Option<String>,
38 otlp_max_queue_size: usize,
39}
40
41impl NodeManager {
42 pub(crate) fn new(args: &Args) -> Self {
44 Self {
45 datadir: Some(args.datadir_path().to_string_lossy().to_string()),
46 metrics_port: args.metrics_port,
47 chain: args.chain,
48 use_sudo: args.sudo,
49 binary_path: None,
50 enable_profiling: args.profile,
51 output_dir: args.output_dir_path(),
52 additional_reth_args: args
54 .reth_args
55 .iter()
56 .filter(|s| !s.is_empty())
57 .cloned()
58 .collect(),
59 comparison_dir: None,
60 tracing_endpoint: args.traces.otlp.as_ref().map(|u| u.to_string()),
61 otlp_max_queue_size: args.otlp_max_queue_size,
62 }
63 }
64
65 pub(crate) fn set_comparison_dir(&mut self, dir: PathBuf) {
67 self.comparison_dir = Some(dir);
68 }
69
70 fn get_log_file_path(&self, ref_type: &str) -> Result<PathBuf> {
72 let comparison_dir = self
73 .comparison_dir
74 .as_ref()
75 .ok_or_eyre("Comparison directory not set. Call set_comparison_dir first.")?;
76
77 let log_dir = comparison_dir.join(ref_type);
79
80 fs::create_dir_all(&log_dir)
82 .wrap_err(format!("Failed to create log directory: {:?}", log_dir))?;
83
84 let log_file = log_dir.join("reth_node.log");
85 Ok(log_file)
86 }
87
88 fn get_perf_sample_rate(&self) -> Option<String> {
90 let perf_rate_file = "/proc/sys/kernel/perf_event_max_sample_rate";
91 if let Ok(content) = fs::read_to_string(perf_rate_file) {
92 let rate_str = content.trim();
93 if !rate_str.is_empty() {
94 if let Ok(system_rate) = rate_str.parse::<u32>() {
95 let capped_rate = std::cmp::min(system_rate, 10000);
96 info!(
97 "Detected perf_event_max_sample_rate: {}, using: {}",
98 system_rate, capped_rate
99 );
100 return Some(capped_rate.to_string());
101 }
102 warn!("Failed to parse perf_event_max_sample_rate: {}", rate_str);
103 }
104 }
105 None
106 }
107
108 async fn get_samply_path(&self) -> Result<String> {
110 let output = Command::new("which")
111 .arg("samply")
112 .output()
113 .await
114 .wrap_err("Failed to execute 'which samply' command")?;
115
116 if !output.status.success() {
117 return Err(eyre!("samply not found in PATH"));
118 }
119
120 let samply_path = String::from_utf8(output.stdout)
121 .wrap_err("samply path is not valid UTF-8")?
122 .trim()
123 .to_string();
124
125 if samply_path.is_empty() {
126 return Err(eyre!("which samply returned empty path"));
127 }
128
129 Ok(samply_path)
130 }
131
132 fn build_reth_args(
134 &self,
135 binary_path_str: &str,
136 additional_args: &[String],
137 ref_type: &str,
138 ) -> (Vec<String>, String) {
139 let mut reth_args = vec![binary_path_str.to_string(), "node".to_string()];
140
141 let chain_str = self.chain.to_string();
143 if chain_str != "mainnet" {
144 reth_args.extend_from_slice(&["--chain".to_string(), chain_str.clone()]);
145 }
146
147 if let Some(ref datadir) = self.datadir {
149 reth_args.extend_from_slice(&["--datadir".to_string(), datadir.clone()]);
150 }
151
152 let metrics_arg = format!("0.0.0.0:{}", self.metrics_port);
154 reth_args.extend_from_slice(&[
155 "--engine.accept-execution-requests-hash".to_string(),
156 "--metrics".to_string(),
157 metrics_arg,
158 "--http".to_string(),
159 "--http.api".to_string(),
160 "eth,reth".to_string(),
161 "--ws".to_string(),
162 "--ws.api".to_string(),
163 "eth,reth".to_string(),
164 "--disable-discovery".to_string(),
165 "--trusted-only".to_string(),
166 "--disable-tx-gossip".to_string(),
167 ]);
168
169 if let Some(ref endpoint) = self.tracing_endpoint {
171 info!("Enabling OTLP tracing export to: {} (service: reth-{})", endpoint, ref_type);
172 reth_args.push(format!("--tracing-otlp={}", endpoint));
174 }
175
176 reth_args.extend_from_slice(&self.additional_reth_args);
179
180 reth_args.extend_from_slice(additional_args);
182
183 (reth_args, chain_str)
184 }
185
186 async fn create_profiling_command(
188 &self,
189 ref_type: &str,
190 reth_args: &[String],
191 ) -> Result<Command> {
192 let profile_dir = self.output_dir.join("profiles");
194 fs::create_dir_all(&profile_dir).wrap_err("Failed to create profiles directory")?;
195
196 let profile_path = profile_dir.join(format!("{}.json.gz", ref_type));
197 info!("Starting reth node with samply profiling...");
198 info!("Profile output: {:?}", profile_path);
199
200 let samply_path = self.get_samply_path().await?;
202
203 let mut cmd = if self.use_sudo {
204 let mut sudo_cmd = Command::new("sudo");
205 sudo_cmd.arg(&samply_path);
206 sudo_cmd
207 } else {
208 Command::new(&samply_path)
209 };
210
211 cmd.args(["record", "--save-only", "-o", &profile_path.to_string_lossy()]);
213
214 if let Some(rate) = self.get_perf_sample_rate() {
216 cmd.args(["--rate", &rate]);
217 }
218
219 cmd.arg("--");
221 cmd.args(reth_args);
222
223 if supports_samply_flags(&reth_args[0]) {
225 cmd.arg("--log.samply");
226 }
227
228 cmd.env("RUST_LOG_STYLE", "never");
230
231 Ok(cmd)
232 }
233
234 fn create_direct_command(&self, reth_args: &[String]) -> Command {
236 let binary_path = &reth_args[0];
237
238 let mut cmd = if self.use_sudo {
239 info!("Starting reth node with sudo...");
240 let mut sudo_cmd = Command::new("sudo");
241 sudo_cmd.args(reth_args);
242 sudo_cmd
243 } else {
244 info!("Starting reth node...");
245 let mut reth_cmd = Command::new(binary_path);
246 reth_cmd.args(&reth_args[1..]); reth_cmd
248 };
249
250 cmd.env("RUST_LOG_STYLE", "never");
252
253 cmd
254 }
255
256 pub(crate) async fn start_node(
259 &mut self,
260 binary_path: &std::path::Path,
261 _git_ref: &str,
262 ref_type: &str,
263 additional_args: &[String],
264 ) -> Result<(tokio::process::Child, String)> {
265 self.binary_path = Some(binary_path.to_path_buf());
267
268 let binary_path_str = binary_path.to_string_lossy();
269 let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
270
271 let reth_command = shlex::try_join(reth_args.iter().map(|s| s.as_str()))
273 .wrap_err("Failed to format reth command string")?;
274
275 if !self.additional_reth_args.is_empty() {
277 info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
278 }
279 if !additional_args.is_empty() {
280 info!("Using reference-specific additional reth arguments: {:?}", additional_args);
281 }
282
283 let mut cmd = if self.enable_profiling {
284 self.create_profiling_command(ref_type, &reth_args).await?
285 } else {
286 self.create_direct_command(&reth_args)
287 };
288
289 #[cfg(unix)]
291 {
292 cmd.process_group(0);
293 }
294
295 if self.tracing_endpoint.is_some() {
297 cmd.env("OTEL_BSP_MAX_QUEUE_SIZE", self.otlp_max_queue_size.to_string()); cmd.env("OTEL_BLRP_MAX_QUEUE_SIZE", "10000"); cmd.env("OTEL_SERVICE_NAME", format!("reth-{}", ref_type));
302 }
303
304 debug!("Executing reth command: {cmd:?}");
305
306 let mut child = cmd
307 .stdout(std::process::Stdio::piped())
308 .stderr(std::process::Stdio::piped())
309 .kill_on_drop(true) .spawn()
311 .wrap_err("Failed to start reth node")?;
312
313 info!(
314 "Reth node started with PID: {:?} (binary: {})",
315 child.id().ok_or_eyre("Reth node is not running")?,
316 binary_path_str
317 );
318
319 let log_file_path = self.get_log_file_path(ref_type)?;
321 info!("Reth node logs will be saved to: {:?}", log_file_path);
322
323 if let Some(stdout) = child.stdout.take() {
325 let log_file = AsyncFile::create(&log_file_path)
326 .await
327 .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
328 tokio::spawn(async move {
329 let reader = AsyncBufReader::new(stdout);
330 let mut lines = reader.lines();
331 let mut log_file = log_file;
332 while let Ok(Some(line)) = lines.next_line().await {
333 debug!("[RETH] {}", line);
334 let log_line = format!("{}\n", line);
336 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
337 debug!("Failed to write to log file: {}", e);
338 }
339 }
340 });
341 }
342
343 if let Some(stderr) = child.stderr.take() {
344 let log_file = AsyncFile::options()
345 .create(true)
346 .append(true)
347 .open(&log_file_path)
348 .await
349 .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
350 tokio::spawn(async move {
351 let reader = AsyncBufReader::new(stderr);
352 let mut lines = reader.lines();
353 let mut log_file = log_file;
354 while let Ok(Some(line)) = lines.next_line().await {
355 debug!("[RETH] {}", line);
356 let log_line = format!("{}\n", line);
358 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
359 debug!("Failed to write to log file: {}", e);
360 }
361 }
362 });
363 }
364
365 sleep(Duration::from_secs(5)).await;
367
368 Ok((child, reth_command))
369 }
370
371 pub(crate) async fn wait_for_node_ready_and_get_tip(
375 &self,
376 child: &mut tokio::process::Child,
377 ) -> Result<u64> {
378 info!("Waiting for node to be ready and synced...");
379
380 let max_wait = Duration::from_secs(120); let check_interval = Duration::from_secs(2);
382 let rpc_url = "http://localhost:8545";
383
384 let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
386 let provider = ProviderBuilder::new().connect_http(url);
387
388 let start_time = tokio::time::Instant::now();
389 let mut iteration = 0;
390
391 timeout(max_wait, async {
392 loop {
393 iteration += 1;
394 debug!(
395 "Readiness check iteration {} (elapsed: {:?})",
396 iteration,
397 start_time.elapsed()
398 );
399
400 if let Some(status) = child.try_wait()? {
402 return Err(eyre!("Node process exited unexpectedly with {status}"));
403 }
404
405 match provider.syncing().await {
407 Ok(sync_result) => {
408 match sync_result {
409 SyncStatus::Info(sync_info) => {
410 debug!("Node is still syncing {sync_info:?}, waiting...");
411 }
412 _ => {
413 debug!("HTTP RPC is up and node is not syncing, checking block number...");
414 match provider.get_block_number().await {
416 Ok(tip) => {
417 debug!("HTTP RPC ready at block: {}, checking WebSocket...", tip);
418
419 let ws_url = format!("ws://localhost:{}", DEFAULT_WS_RPC_PORT);
421 debug!("Attempting WebSocket connection to {} (public endpoint)", ws_url);
422 let ws_connect = WsConnect::new(&ws_url);
423
424 match RpcClient::connect_pubsub(ws_connect).await
425 {
426 Ok(_) => {
427 info!(
428 "Node is ready (HTTP and WebSocket) at block: {} (took {:?}, {} iterations)",
429 tip, start_time.elapsed(), iteration
430 );
431 return Ok(tip);
432 }
433 Err(e) => {
434 debug!(
435 "HTTP RPC ready but WebSocket not ready yet (iteration {}): {:?}",
436 iteration, e
437 );
438 debug!("WebSocket error details: {}", e);
439 }
440 }
441 }
442 Err(e) => {
443 debug!("Failed to get block number (iteration {}): {:?}", iteration, e);
444 }
445 }
446 }
447 }
448 }
449 Err(e) => {
450 debug!("Node RPC not ready yet or failed to check sync status (iteration {}): {:?}", iteration, e);
451 }
452 }
453
454 debug!("Sleeping for {:?} before next check", check_interval);
455 sleep(check_interval).await;
456 }
457 })
458 .await
459 .wrap_err("Timed out waiting for node to be ready and synced")?
460 }
461
462 pub(crate) async fn wait_for_rpc_and_get_tip(
467 &self,
468 child: &mut tokio::process::Child,
469 ) -> Result<u64> {
470 info!("Waiting for node RPC to be ready (skipping sync wait)...");
471
472 let max_wait = Duration::from_secs(60);
473 let check_interval = Duration::from_secs(2);
474 let rpc_url = "http://localhost:8545";
475
476 let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
477 let provider = ProviderBuilder::new().connect_http(url);
478
479 let start_time = tokio::time::Instant::now();
480 let mut iteration = 0;
481
482 timeout(max_wait, async {
483 loop {
484 iteration += 1;
485 debug!(
486 "RPC readiness check iteration {} (elapsed: {:?})",
487 iteration,
488 start_time.elapsed()
489 );
490
491 if let Some(status) = child.try_wait()? {
492 return Err(eyre!("Node process exited unexpectedly with {status}"));
493 }
494
495 match provider.get_block_number().await {
496 Ok(tip) => {
497 debug!("HTTP RPC ready at block: {}, checking WebSocket...", tip);
498
499 let ws_url = format!("ws://localhost:{}", DEFAULT_WS_RPC_PORT);
500 let ws_connect = WsConnect::new(&ws_url);
501
502 match RpcClient::connect_pubsub(ws_connect).await {
503 Ok(_) => {
504 info!(
505 "Node RPC is ready at block: {} (took {:?}, {} iterations)",
506 tip,
507 start_time.elapsed(),
508 iteration
509 );
510 return Ok(tip);
511 }
512 Err(e) => {
513 debug!(
514 "HTTP RPC ready but WebSocket not ready yet (iteration {}): {:?}",
515 iteration, e
516 );
517 }
518 }
519 }
520 Err(e) => {
521 debug!("RPC not ready yet (iteration {}): {:?}", iteration, e);
522 }
523 }
524
525 sleep(check_interval).await;
526 }
527 })
528 .await
529 .wrap_err("Timed out waiting for node RPC to be ready")?
530 }
531
532 pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
534 let pid = child.id().ok_or_eyre("Child process ID should be available")?;
535
536 match child.try_wait() {
538 Ok(Some(status)) => {
539 info!("Reth node (PID: {}) has already exited with status: {:?}", pid, status);
540 return Ok(());
541 }
542 Ok(None) => {
543 info!("Stopping process gracefully with SIGINT (PID: {})...", pid);
545 }
546 Err(e) => {
547 return Err(eyre!("Failed to check process status: {}", e));
548 }
549 }
550
551 #[cfg(unix)]
552 {
553 let nix_pgid = Pid::from_raw(pid as i32);
555
556 match killpg(nix_pgid, Signal::SIGINT) {
557 Ok(()) => {}
558 Err(nix::errno::Errno::ESRCH) => {
559 info!("Process group {} has already exited", pid);
560 }
561 Err(e) => {
562 return Err(eyre!("Failed to send SIGINT to process group {}: {}", pid, e));
563 }
564 }
565 }
566
567 #[cfg(not(unix))]
568 {
569 let output = Command::new("taskkill")
571 .args(["/PID", &pid.to_string(), "/F"])
572 .output()
573 .await
574 .wrap_err("Failed to execute taskkill command")?;
575
576 if !output.status.success() {
577 let stderr = String::from_utf8_lossy(&output.stderr);
578 if stderr.contains("not found") || stderr.contains("not exist") {
580 info!("Process {} has already exited", pid);
581 } else {
582 return Err(eyre!("Failed to kill process {}: {}", pid, stderr));
583 }
584 }
585 }
586
587 match child.wait().await {
589 Ok(status) => {
590 info!("Reth node (PID: {}) exited with status: {:?}", pid, status);
591 }
592 Err(e) => {
593 debug!("Error waiting for process exit (may have already exited): {}", e);
595 }
596 }
597
598 Ok(())
599 }
600
601 pub(crate) async fn unwind_to_block(&self, block_number: u64) -> Result<()> {
603 if self.use_sudo {
604 info!("Unwinding node to block: {} (with sudo)", block_number);
605 } else {
606 info!("Unwinding node to block: {}", block_number);
607 }
608
609 let binary_path = self
611 .binary_path
612 .as_ref()
613 .map(|p| p.to_string_lossy().to_string())
614 .unwrap_or_else(|| "./target/profiling/reth".to_string());
615
616 let mut cmd = if self.use_sudo {
617 let mut sudo_cmd = Command::new("sudo");
618 sudo_cmd.args([&binary_path, "stage", "unwind"]);
619 sudo_cmd
620 } else {
621 let mut reth_cmd = Command::new(&binary_path);
622 reth_cmd.args(["stage", "unwind"]);
623 reth_cmd
624 };
625
626 let chain_str = self.chain.to_string();
628 if chain_str != "mainnet" {
629 cmd.args(["--chain", &chain_str]);
630 }
631
632 if let Some(ref datadir) = self.datadir {
634 cmd.args(["--datadir", datadir]);
635 }
636
637 cmd.args(["to-block", &block_number.to_string()]);
638
639 cmd.env("RUST_LOG_STYLE", "never");
641
642 debug!("Executing reth unwind command: {:?}", cmd);
644
645 let mut child = cmd
646 .stdout(std::process::Stdio::piped())
647 .stderr(std::process::Stdio::piped())
648 .spawn()
649 .wrap_err("Failed to start unwind command")?;
650
651 if let Some(stdout) = child.stdout.take() {
653 tokio::spawn(async move {
654 let reader = AsyncBufReader::new(stdout);
655 let mut lines = reader.lines();
656 while let Ok(Some(line)) = lines.next_line().await {
657 debug!("[RETH-UNWIND] {}", line);
658 }
659 });
660 }
661
662 if let Some(stderr) = child.stderr.take() {
663 tokio::spawn(async move {
664 let reader = AsyncBufReader::new(stderr);
665 let mut lines = reader.lines();
666 while let Ok(Some(line)) = lines.next_line().await {
667 debug!("[RETH-UNWIND] {}", line);
668 }
669 });
670 }
671
672 let status = child.wait().await.wrap_err("Failed to wait for unwind command")?;
674
675 if !status.success() {
676 return Err(eyre!("Unwind command failed with exit code: {:?}", status.code()));
677 }
678
679 info!("Unwound to block: {}", block_number);
680 Ok(())
681 }
682}
683
684fn supports_samply_flags(bin: &str) -> bool {
685 let mut cmd = std::process::Command::new(bin);
686 cmd.args(["--log.samply", "--help"]);
689 debug!(?cmd, "Checking samply flags support");
690 let Ok(output) = cmd.output() else {
691 return false;
692 };
693 debug!(?output, "Samply flags support check");
694 output.status.success()
695}