1use crate::cli::Args;
4use alloy_provider::{Provider, ProviderBuilder};
5use alloy_rpc_types_eth::SyncStatus;
6use eyre::{eyre, OptionExt, Result, WrapErr};
7#[cfg(unix)]
8use nix::sys::signal::{killpg, Signal};
9#[cfg(unix)]
10use nix::unistd::Pid;
11use reth_chainspec::Chain;
12use std::{fs, path::PathBuf, time::Duration};
13use tokio::{
14 fs::File as AsyncFile,
15 io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader},
16 process::Command,
17 time::{sleep, timeout},
18};
19use tracing::{debug, info, warn};
20
21pub(crate) struct NodeManager {
23 datadir: Option<String>,
24 metrics_port: u16,
25 chain: Chain,
26 use_sudo: bool,
27 binary_path: Option<std::path::PathBuf>,
28 enable_profiling: bool,
29 output_dir: PathBuf,
30 additional_reth_args: Vec<String>,
31 comparison_dir: Option<PathBuf>,
32 tracing_endpoint: Option<String>,
33 otlp_max_queue_size: usize,
34}
35
36impl NodeManager {
37 pub(crate) fn new(args: &Args) -> Self {
39 Self {
40 datadir: Some(args.datadir_path().to_string_lossy().to_string()),
41 metrics_port: args.metrics_port,
42 chain: args.chain,
43 use_sudo: args.sudo,
44 binary_path: None,
45 enable_profiling: args.profile,
46 output_dir: args.output_dir_path(),
47 additional_reth_args: args
49 .reth_args
50 .iter()
51 .filter(|s| !s.is_empty())
52 .cloned()
53 .collect(),
54 comparison_dir: None,
55 tracing_endpoint: args.traces.otlp.as_ref().map(|u| u.to_string()),
56 otlp_max_queue_size: args.otlp_max_queue_size,
57 }
58 }
59
60 pub(crate) fn set_comparison_dir(&mut self, dir: PathBuf) {
62 self.comparison_dir = Some(dir);
63 }
64
65 fn get_log_file_path(&self, ref_type: &str) -> Result<PathBuf> {
67 let comparison_dir = self
68 .comparison_dir
69 .as_ref()
70 .ok_or_eyre("Comparison directory not set. Call set_comparison_dir first.")?;
71
72 let log_dir = comparison_dir.join(ref_type);
74
75 fs::create_dir_all(&log_dir)
77 .wrap_err(format!("Failed to create log directory: {:?}", log_dir))?;
78
79 let log_file = log_dir.join("reth_node.log");
80 Ok(log_file)
81 }
82
83 fn get_perf_sample_rate(&self) -> Option<String> {
85 let perf_rate_file = "/proc/sys/kernel/perf_event_max_sample_rate";
86 if let Ok(content) = fs::read_to_string(perf_rate_file) {
87 let rate_str = content.trim();
88 if !rate_str.is_empty() {
89 if let Ok(system_rate) = rate_str.parse::<u32>() {
90 let capped_rate = std::cmp::min(system_rate, 10000);
91 info!(
92 "Detected perf_event_max_sample_rate: {}, using: {}",
93 system_rate, capped_rate
94 );
95 return Some(capped_rate.to_string());
96 }
97 warn!("Failed to parse perf_event_max_sample_rate: {}", rate_str);
98 }
99 }
100 None
101 }
102
103 async fn get_samply_path(&self) -> Result<String> {
105 let output = Command::new("which")
106 .arg("samply")
107 .output()
108 .await
109 .wrap_err("Failed to execute 'which samply' command")?;
110
111 if !output.status.success() {
112 return Err(eyre!("samply not found in PATH"));
113 }
114
115 let samply_path = String::from_utf8(output.stdout)
116 .wrap_err("samply path is not valid UTF-8")?
117 .trim()
118 .to_string();
119
120 if samply_path.is_empty() {
121 return Err(eyre!("which samply returned empty path"));
122 }
123
124 Ok(samply_path)
125 }
126
127 fn build_reth_args(
129 &self,
130 binary_path_str: &str,
131 additional_args: &[String],
132 ref_type: &str,
133 ) -> (Vec<String>, String) {
134 let mut reth_args = vec![binary_path_str.to_string(), "node".to_string()];
135
136 let chain_str = self.chain.to_string();
138 if chain_str != "mainnet" {
139 reth_args.extend_from_slice(&["--chain".to_string(), chain_str.clone()]);
140 }
141
142 if let Some(ref datadir) = self.datadir {
144 reth_args.extend_from_slice(&["--datadir".to_string(), datadir.clone()]);
145 }
146
147 let metrics_arg = format!("0.0.0.0:{}", self.metrics_port);
149 reth_args.extend_from_slice(&[
150 "--engine.accept-execution-requests-hash".to_string(),
151 "--metrics".to_string(),
152 metrics_arg,
153 "--http".to_string(),
154 "--http.api".to_string(),
155 "eth".to_string(),
156 "--disable-discovery".to_string(),
157 "--trusted-only".to_string(),
158 ]);
159
160 if let Some(ref endpoint) = self.tracing_endpoint {
162 info!("Enabling OTLP tracing export to: {} (service: reth-{})", endpoint, ref_type);
163 reth_args.push(format!("--tracing-otlp={}", endpoint));
165 }
166
167 reth_args.extend_from_slice(&self.additional_reth_args);
170
171 reth_args.extend_from_slice(additional_args);
173
174 (reth_args, chain_str)
175 }
176
177 async fn create_profiling_command(
179 &self,
180 ref_type: &str,
181 reth_args: &[String],
182 ) -> Result<Command> {
183 let profile_dir = self.output_dir.join("profiles");
185 fs::create_dir_all(&profile_dir).wrap_err("Failed to create profiles directory")?;
186
187 let profile_path = profile_dir.join(format!("{}.json.gz", ref_type));
188 info!("Starting reth node with samply profiling...");
189 info!("Profile output: {:?}", profile_path);
190
191 let samply_path = self.get_samply_path().await?;
193
194 let mut cmd = if self.use_sudo {
195 let mut sudo_cmd = Command::new("sudo");
196 sudo_cmd.arg(&samply_path);
197 sudo_cmd
198 } else {
199 Command::new(&samply_path)
200 };
201
202 cmd.args(["record", "--save-only", "-o", &profile_path.to_string_lossy()]);
204
205 if let Some(rate) = self.get_perf_sample_rate() {
207 cmd.args(["--rate", &rate]);
208 }
209
210 cmd.arg("--");
212 cmd.args(reth_args);
213
214 if supports_samply_flags(&reth_args[0]) {
216 cmd.arg("--log.samply");
217 }
218
219 cmd.env("RUST_LOG_STYLE", "never");
221
222 Ok(cmd)
223 }
224
225 fn create_direct_command(&self, reth_args: &[String]) -> Command {
227 let binary_path = &reth_args[0];
228
229 let mut cmd = if self.use_sudo {
230 info!("Starting reth node with sudo...");
231 let mut sudo_cmd = Command::new("sudo");
232 sudo_cmd.args(reth_args);
233 sudo_cmd
234 } else {
235 info!("Starting reth node...");
236 let mut reth_cmd = Command::new(binary_path);
237 reth_cmd.args(&reth_args[1..]); reth_cmd
239 };
240
241 cmd.env("RUST_LOG_STYLE", "never");
243
244 cmd
245 }
246
247 pub(crate) async fn start_node(
250 &mut self,
251 binary_path: &std::path::Path,
252 _git_ref: &str,
253 ref_type: &str,
254 additional_args: &[String],
255 ) -> Result<(tokio::process::Child, String)> {
256 self.binary_path = Some(binary_path.to_path_buf());
258
259 let binary_path_str = binary_path.to_string_lossy();
260 let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
261
262 let reth_command = shlex::try_join(reth_args.iter().map(|s| s.as_str()))
264 .wrap_err("Failed to format reth command string")?;
265
266 if !self.additional_reth_args.is_empty() {
268 info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
269 }
270 if !additional_args.is_empty() {
271 info!("Using reference-specific additional reth arguments: {:?}", additional_args);
272 }
273
274 let mut cmd = if self.enable_profiling {
275 self.create_profiling_command(ref_type, &reth_args).await?
276 } else {
277 self.create_direct_command(&reth_args)
278 };
279
280 #[cfg(unix)]
282 {
283 cmd.process_group(0);
284 }
285
286 if self.tracing_endpoint.is_some() {
288 cmd.env("OTEL_BSP_MAX_QUEUE_SIZE", self.otlp_max_queue_size.to_string()); cmd.env("OTEL_BLRP_MAX_QUEUE_SIZE", "10000"); cmd.env("OTEL_SERVICE_NAME", format!("reth-{}", ref_type));
293 }
294
295 debug!("Executing reth command: {cmd:?}");
296
297 let mut child = cmd
298 .stdout(std::process::Stdio::piped())
299 .stderr(std::process::Stdio::piped())
300 .kill_on_drop(true) .spawn()
302 .wrap_err("Failed to start reth node")?;
303
304 info!(
305 "Reth node started with PID: {:?} (binary: {})",
306 child.id().ok_or_eyre("Reth node is not running")?,
307 binary_path_str
308 );
309
310 let log_file_path = self.get_log_file_path(ref_type)?;
312 info!("Reth node logs will be saved to: {:?}", log_file_path);
313
314 if let Some(stdout) = child.stdout.take() {
316 let log_file = AsyncFile::create(&log_file_path)
317 .await
318 .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
319 tokio::spawn(async move {
320 let reader = AsyncBufReader::new(stdout);
321 let mut lines = reader.lines();
322 let mut log_file = log_file;
323 while let Ok(Some(line)) = lines.next_line().await {
324 debug!("[RETH] {}", line);
325 let log_line = format!("{}\n", line);
327 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
328 debug!("Failed to write to log file: {}", e);
329 }
330 }
331 });
332 }
333
334 if let Some(stderr) = child.stderr.take() {
335 let log_file = AsyncFile::options()
336 .create(true)
337 .append(true)
338 .open(&log_file_path)
339 .await
340 .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
341 tokio::spawn(async move {
342 let reader = AsyncBufReader::new(stderr);
343 let mut lines = reader.lines();
344 let mut log_file = log_file;
345 while let Ok(Some(line)) = lines.next_line().await {
346 debug!("[RETH] {}", line);
347 let log_line = format!("{}\n", line);
349 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
350 debug!("Failed to write to log file: {}", e);
351 }
352 }
353 });
354 }
355
356 sleep(Duration::from_secs(5)).await;
358
359 Ok((child, reth_command))
360 }
361
362 pub(crate) async fn wait_for_node_ready_and_get_tip(&self) -> Result<u64> {
364 info!("Waiting for node to be ready and synced...");
365
366 let max_wait = Duration::from_secs(120); let check_interval = Duration::from_secs(2);
368 let rpc_url = "http://localhost:8545";
369
370 let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
372 let provider = ProviderBuilder::new().connect_http(url);
373
374 timeout(max_wait, async {
375 loop {
376 match provider.syncing().await {
378 Ok(sync_result) => {
379 match sync_result {
380 SyncStatus::Info(sync_info) => {
381 debug!("Node is still syncing {sync_info:?}, waiting...");
382 }
383 _ => {
384 match provider.get_block_number().await {
386 Ok(tip) => {
387 info!("Node is ready and not syncing at block: {}", tip);
388 return Ok(tip);
389 }
390 Err(e) => {
391 debug!("Failed to get block number: {}", e);
392 }
393 }
394 }
395 }
396 }
397 Err(e) => {
398 debug!("Node RPC not ready yet or failed to check sync status: {}", e);
399 }
400 }
401
402 sleep(check_interval).await;
403 }
404 })
405 .await
406 .wrap_err("Timed out waiting for node to be ready and synced")?
407 }
408
409 pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
411 let pid = child.id().expect("Child process ID should be available");
412
413 match child.try_wait() {
415 Ok(Some(status)) => {
416 info!("Reth node (PID: {}) has already exited with status: {:?}", pid, status);
417 return Ok(());
418 }
419 Ok(None) => {
420 info!("Stopping process gracefully with SIGINT (PID: {})...", pid);
422 }
423 Err(e) => {
424 return Err(eyre!("Failed to check process status: {}", e));
425 }
426 }
427
428 #[cfg(unix)]
429 {
430 let nix_pgid = Pid::from_raw(pid as i32);
432
433 match killpg(nix_pgid, Signal::SIGINT) {
434 Ok(()) => {}
435 Err(nix::errno::Errno::ESRCH) => {
436 info!("Process group {} has already exited", pid);
437 }
438 Err(e) => {
439 return Err(eyre!("Failed to send SIGINT to process group {}: {}", pid, e));
440 }
441 }
442 }
443
444 #[cfg(not(unix))]
445 {
446 let output = Command::new("taskkill")
448 .args(["/PID", &pid.to_string(), "/F"])
449 .output()
450 .await
451 .wrap_err("Failed to execute taskkill command")?;
452
453 if !output.status.success() {
454 let stderr = String::from_utf8_lossy(&output.stderr);
455 if stderr.contains("not found") || stderr.contains("not exist") {
457 info!("Process {} has already exited", pid);
458 } else {
459 return Err(eyre!("Failed to kill process {}: {}", pid, stderr));
460 }
461 }
462 }
463
464 match child.wait().await {
466 Ok(status) => {
467 info!("Reth node (PID: {}) exited with status: {:?}", pid, status);
468 }
469 Err(e) => {
470 debug!("Error waiting for process exit (may have already exited): {}", e);
472 }
473 }
474
475 Ok(())
476 }
477
478 pub(crate) async fn unwind_to_block(&self, block_number: u64) -> Result<()> {
480 if self.use_sudo {
481 info!("Unwinding node to block: {} (with sudo)", block_number);
482 } else {
483 info!("Unwinding node to block: {}", block_number);
484 }
485
486 let binary_path = self
488 .binary_path
489 .as_ref()
490 .map(|p| p.to_string_lossy().to_string())
491 .unwrap_or_else(|| "./target/profiling/reth".to_string());
492
493 let mut cmd = if self.use_sudo {
494 let mut sudo_cmd = Command::new("sudo");
495 sudo_cmd.args([&binary_path, "stage", "unwind"]);
496 sudo_cmd
497 } else {
498 let mut reth_cmd = Command::new(&binary_path);
499 reth_cmd.args(["stage", "unwind"]);
500 reth_cmd
501 };
502
503 let chain_str = self.chain.to_string();
505 if chain_str != "mainnet" {
506 cmd.args(["--chain", &chain_str]);
507 }
508
509 if let Some(ref datadir) = self.datadir {
511 cmd.args(["--datadir", datadir]);
512 }
513
514 cmd.args(["to-block", &block_number.to_string()]);
515
516 cmd.env("RUST_LOG_STYLE", "never");
518
519 debug!("Executing reth unwind command: {:?}", cmd);
521
522 let mut child = cmd
523 .stdout(std::process::Stdio::piped())
524 .stderr(std::process::Stdio::piped())
525 .spawn()
526 .wrap_err("Failed to start unwind command")?;
527
528 if let Some(stdout) = child.stdout.take() {
530 tokio::spawn(async move {
531 let reader = AsyncBufReader::new(stdout);
532 let mut lines = reader.lines();
533 while let Ok(Some(line)) = lines.next_line().await {
534 debug!("[RETH-UNWIND] {}", line);
535 }
536 });
537 }
538
539 if let Some(stderr) = child.stderr.take() {
540 tokio::spawn(async move {
541 let reader = AsyncBufReader::new(stderr);
542 let mut lines = reader.lines();
543 while let Ok(Some(line)) = lines.next_line().await {
544 debug!("[RETH-UNWIND] {}", line);
545 }
546 });
547 }
548
549 let status = child.wait().await.wrap_err("Failed to wait for unwind command")?;
551
552 if !status.success() {
553 return Err(eyre!("Unwind command failed with exit code: {:?}", status.code()));
554 }
555
556 info!("Unwound to block: {}", block_number);
557 Ok(())
558 }
559}
560
561fn supports_samply_flags(bin: &str) -> bool {
562 let mut cmd = std::process::Command::new(bin);
563 cmd.args(["--log.samply", "--help"]);
566 debug!(?cmd, "Checking samply flags support");
567 let Ok(output) = cmd.output() else {
568 return false;
569 };
570 debug!(?output, "Samply flags support check");
571 output.status.success()
572}