1use crate::cli::Args;
4use alloy_provider::{Provider, ProviderBuilder};
5use alloy_rpc_types_eth::SyncStatus;
6use eyre::{eyre, OptionExt, Result, WrapErr};
7#[cfg(unix)]
8use nix::sys::signal::{killpg, Signal};
9#[cfg(unix)]
10use nix::unistd::Pid;
11use reth_chainspec::Chain;
12use std::{fs, path::PathBuf, time::Duration};
13use tokio::{
14 fs::File as AsyncFile,
15 io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader},
16 process::Command,
17 time::{sleep, timeout},
18};
19use tracing::{debug, info, warn};
20
21pub(crate) struct NodeManager {
23 datadir: Option<String>,
24 metrics_port: u16,
25 chain: Chain,
26 use_sudo: bool,
27 binary_path: Option<std::path::PathBuf>,
28 enable_profiling: bool,
29 output_dir: PathBuf,
30 additional_reth_args: Vec<String>,
31 comparison_dir: Option<PathBuf>,
32 tracing_endpoint: Option<String>,
33 otlp_max_queue_size: usize,
34}
35
36impl NodeManager {
37 pub(crate) fn new(args: &Args) -> Self {
39 Self {
40 datadir: Some(args.datadir_path().to_string_lossy().to_string()),
41 metrics_port: args.metrics_port,
42 chain: args.chain,
43 use_sudo: args.sudo,
44 binary_path: None,
45 enable_profiling: args.profile,
46 output_dir: args.output_dir_path(),
47 additional_reth_args: args.reth_args.clone(),
48 comparison_dir: None,
49 tracing_endpoint: args.traces.otlp.as_ref().map(|u| u.to_string()),
50 otlp_max_queue_size: args.otlp_max_queue_size,
51 }
52 }
53
54 pub(crate) fn set_comparison_dir(&mut self, dir: PathBuf) {
56 self.comparison_dir = Some(dir);
57 }
58
59 fn get_log_file_path(&self, ref_type: &str) -> Result<PathBuf> {
61 let comparison_dir = self
62 .comparison_dir
63 .as_ref()
64 .ok_or_eyre("Comparison directory not set. Call set_comparison_dir first.")?;
65
66 let log_dir = comparison_dir.join(ref_type);
68
69 fs::create_dir_all(&log_dir)
71 .wrap_err(format!("Failed to create log directory: {:?}", log_dir))?;
72
73 let log_file = log_dir.join("reth_node.log");
74 Ok(log_file)
75 }
76
77 fn get_perf_sample_rate(&self) -> Option<String> {
79 let perf_rate_file = "/proc/sys/kernel/perf_event_max_sample_rate";
80 if let Ok(content) = fs::read_to_string(perf_rate_file) {
81 let rate_str = content.trim();
82 if !rate_str.is_empty() {
83 if let Ok(system_rate) = rate_str.parse::<u32>() {
84 let capped_rate = std::cmp::min(system_rate, 10000);
85 info!(
86 "Detected perf_event_max_sample_rate: {}, using: {}",
87 system_rate, capped_rate
88 );
89 return Some(capped_rate.to_string());
90 }
91 warn!("Failed to parse perf_event_max_sample_rate: {}", rate_str);
92 }
93 }
94 None
95 }
96
97 async fn get_samply_path(&self) -> Result<String> {
99 let output = Command::new("which")
100 .arg("samply")
101 .output()
102 .await
103 .wrap_err("Failed to execute 'which samply' command")?;
104
105 if !output.status.success() {
106 return Err(eyre!("samply not found in PATH"));
107 }
108
109 let samply_path = String::from_utf8(output.stdout)
110 .wrap_err("samply path is not valid UTF-8")?
111 .trim()
112 .to_string();
113
114 if samply_path.is_empty() {
115 return Err(eyre!("which samply returned empty path"));
116 }
117
118 Ok(samply_path)
119 }
120
121 fn build_reth_args(
123 &self,
124 binary_path_str: &str,
125 additional_args: &[String],
126 ref_type: &str,
127 ) -> (Vec<String>, String) {
128 let mut reth_args = vec![binary_path_str.to_string(), "node".to_string()];
129
130 let chain_str = self.chain.to_string();
132 if chain_str != "mainnet" {
133 reth_args.extend_from_slice(&["--chain".to_string(), chain_str.clone()]);
134 }
135
136 if let Some(ref datadir) = self.datadir {
138 reth_args.extend_from_slice(&["--datadir".to_string(), datadir.clone()]);
139 }
140
141 let metrics_arg = format!("0.0.0.0:{}", self.metrics_port);
143 reth_args.extend_from_slice(&[
144 "--engine.accept-execution-requests-hash".to_string(),
145 "--metrics".to_string(),
146 metrics_arg,
147 "--http".to_string(),
148 "--http.api".to_string(),
149 "eth".to_string(),
150 "--disable-discovery".to_string(),
151 "--trusted-only".to_string(),
152 ]);
153
154 if let Some(ref endpoint) = self.tracing_endpoint {
156 info!("Enabling OTLP tracing export to: {} (service: reth-{})", endpoint, ref_type);
157 reth_args.push(format!("--tracing-otlp={}", endpoint));
159 }
160
161 reth_args.extend_from_slice(&self.additional_reth_args);
164
165 reth_args.extend_from_slice(additional_args);
167
168 (reth_args, chain_str)
169 }
170
171 async fn create_profiling_command(
173 &self,
174 ref_type: &str,
175 reth_args: &[String],
176 ) -> Result<Command> {
177 let profile_dir = self.output_dir.join("profiles");
179 fs::create_dir_all(&profile_dir).wrap_err("Failed to create profiles directory")?;
180
181 let profile_path = profile_dir.join(format!("{}.json.gz", ref_type));
182 info!("Starting reth node with samply profiling...");
183 info!("Profile output: {:?}", profile_path);
184
185 let samply_path = self.get_samply_path().await?;
187
188 let mut cmd = if self.use_sudo {
189 let mut sudo_cmd = Command::new("sudo");
190 sudo_cmd.arg(&samply_path);
191 sudo_cmd
192 } else {
193 Command::new(&samply_path)
194 };
195
196 cmd.args(["record", "--save-only", "-o", &profile_path.to_string_lossy()]);
198
199 if let Some(rate) = self.get_perf_sample_rate() {
201 cmd.args(["--rate", &rate]);
202 }
203
204 cmd.arg("--");
206 cmd.args(reth_args);
207
208 cmd.env("RUST_LOG_STYLE", "never");
210
211 Ok(cmd)
212 }
213
214 fn create_direct_command(&self, reth_args: &[String]) -> Command {
216 let binary_path = &reth_args[0];
217
218 let mut cmd = if self.use_sudo {
219 info!("Starting reth node with sudo...");
220 let mut sudo_cmd = Command::new("sudo");
221 sudo_cmd.args(reth_args);
222 sudo_cmd
223 } else {
224 info!("Starting reth node...");
225 let mut reth_cmd = Command::new(binary_path);
226 reth_cmd.args(&reth_args[1..]); reth_cmd
228 };
229
230 cmd.env("RUST_LOG_STYLE", "never");
232
233 cmd
234 }
235
236 pub(crate) async fn start_node(
238 &mut self,
239 binary_path: &std::path::Path,
240 _git_ref: &str,
241 ref_type: &str,
242 additional_args: &[String],
243 ) -> Result<tokio::process::Child> {
244 self.binary_path = Some(binary_path.to_path_buf());
246
247 let binary_path_str = binary_path.to_string_lossy();
248 let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
249
250 if !self.additional_reth_args.is_empty() {
252 info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
253 }
254 if !additional_args.is_empty() {
255 info!("Using reference-specific additional reth arguments: {:?}", additional_args);
256 }
257
258 let mut cmd = if self.enable_profiling {
259 self.create_profiling_command(ref_type, &reth_args).await?
260 } else {
261 self.create_direct_command(&reth_args)
262 };
263
264 #[cfg(unix)]
266 {
267 cmd.process_group(0);
268 }
269
270 if self.tracing_endpoint.is_some() {
272 cmd.env("OTEL_BSP_MAX_QUEUE_SIZE", self.otlp_max_queue_size.to_string()); cmd.env("OTEL_BLRP_MAX_QUEUE_SIZE", "10000"); cmd.env("OTEL_SERVICE_NAME", format!("reth-{}", ref_type));
277 }
278
279 debug!("Executing reth command: {cmd:?}");
280
281 let mut child = cmd
282 .stdout(std::process::Stdio::piped())
283 .stderr(std::process::Stdio::piped())
284 .kill_on_drop(true) .spawn()
286 .wrap_err("Failed to start reth node")?;
287
288 info!(
289 "Reth node started with PID: {:?} (binary: {})",
290 child.id().ok_or_eyre("Reth node is not running")?,
291 binary_path_str
292 );
293
294 let log_file_path = self.get_log_file_path(ref_type)?;
296 info!("Reth node logs will be saved to: {:?}", log_file_path);
297
298 if let Some(stdout) = child.stdout.take() {
300 let log_file = AsyncFile::create(&log_file_path)
301 .await
302 .wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
303 tokio::spawn(async move {
304 let reader = AsyncBufReader::new(stdout);
305 let mut lines = reader.lines();
306 let mut log_file = log_file;
307 while let Ok(Some(line)) = lines.next_line().await {
308 debug!("[RETH] {}", line);
309 let log_line = format!("{}\n", line);
311 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
312 debug!("Failed to write to log file: {}", e);
313 }
314 }
315 });
316 }
317
318 if let Some(stderr) = child.stderr.take() {
319 let log_file = AsyncFile::options()
320 .create(true)
321 .append(true)
322 .open(&log_file_path)
323 .await
324 .wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
325 tokio::spawn(async move {
326 let reader = AsyncBufReader::new(stderr);
327 let mut lines = reader.lines();
328 let mut log_file = log_file;
329 while let Ok(Some(line)) = lines.next_line().await {
330 debug!("[RETH] {}", line);
331 let log_line = format!("{}\n", line);
333 if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
334 debug!("Failed to write to log file: {}", e);
335 }
336 }
337 });
338 }
339
340 sleep(Duration::from_secs(5)).await;
342
343 Ok(child)
344 }
345
346 pub(crate) async fn wait_for_node_ready_and_get_tip(&self) -> Result<u64> {
348 info!("Waiting for node to be ready and synced...");
349
350 let max_wait = Duration::from_secs(120); let check_interval = Duration::from_secs(2);
352 let rpc_url = "http://localhost:8545";
353
354 let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
356 let provider = ProviderBuilder::new().connect_http(url);
357
358 timeout(max_wait, async {
359 loop {
360 match provider.syncing().await {
362 Ok(sync_result) => {
363 match sync_result {
364 SyncStatus::Info(sync_info) => {
365 debug!("Node is still syncing {sync_info:?}, waiting...");
366 }
367 _ => {
368 match provider.get_block_number().await {
370 Ok(tip) => {
371 info!("Node is ready and not syncing at block: {}", tip);
372 return Ok(tip);
373 }
374 Err(e) => {
375 debug!("Failed to get block number: {}", e);
376 }
377 }
378 }
379 }
380 }
381 Err(e) => {
382 debug!("Node RPC not ready yet or failed to check sync status: {}", e);
383 }
384 }
385
386 sleep(check_interval).await;
387 }
388 })
389 .await
390 .wrap_err("Timed out waiting for node to be ready and synced")?
391 }
392
393 pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
395 let pid = child.id().expect("Child process ID should be available");
396
397 match child.try_wait() {
399 Ok(Some(status)) => {
400 info!("Reth node (PID: {}) has already exited with status: {:?}", pid, status);
401 return Ok(());
402 }
403 Ok(None) => {
404 info!("Stopping process gracefully with SIGINT (PID: {})...", pid);
406 }
407 Err(e) => {
408 return Err(eyre!("Failed to check process status: {}", e));
409 }
410 }
411
412 #[cfg(unix)]
413 {
414 let nix_pgid = Pid::from_raw(pid as i32);
416
417 match killpg(nix_pgid, Signal::SIGINT) {
418 Ok(()) => {}
419 Err(nix::errno::Errno::ESRCH) => {
420 info!("Process group {} has already exited", pid);
421 }
422 Err(e) => {
423 return Err(eyre!("Failed to send SIGINT to process group {}: {}", pid, e));
424 }
425 }
426 }
427
428 #[cfg(not(unix))]
429 {
430 let output = Command::new("taskkill")
432 .args(["/PID", &pid.to_string(), "/F"])
433 .output()
434 .await
435 .wrap_err("Failed to execute taskkill command")?;
436
437 if !output.status.success() {
438 let stderr = String::from_utf8_lossy(&output.stderr);
439 if stderr.contains("not found") || stderr.contains("not exist") {
441 info!("Process {} has already exited", pid);
442 } else {
443 return Err(eyre!("Failed to kill process {}: {}", pid, stderr));
444 }
445 }
446 }
447
448 match child.wait().await {
450 Ok(status) => {
451 info!("Reth node (PID: {}) exited with status: {:?}", pid, status);
452 }
453 Err(e) => {
454 debug!("Error waiting for process exit (may have already exited): {}", e);
456 }
457 }
458
459 Ok(())
460 }
461
462 pub(crate) async fn unwind_to_block(&self, block_number: u64) -> Result<()> {
464 if self.use_sudo {
465 info!("Unwinding node to block: {} (with sudo)", block_number);
466 } else {
467 info!("Unwinding node to block: {}", block_number);
468 }
469
470 let binary_path = self
472 .binary_path
473 .as_ref()
474 .map(|p| p.to_string_lossy().to_string())
475 .unwrap_or_else(|| "./target/profiling/reth".to_string());
476
477 let mut cmd = if self.use_sudo {
478 let mut sudo_cmd = Command::new("sudo");
479 sudo_cmd.args([&binary_path, "stage", "unwind"]);
480 sudo_cmd
481 } else {
482 let mut reth_cmd = Command::new(&binary_path);
483 reth_cmd.args(["stage", "unwind"]);
484 reth_cmd
485 };
486
487 let chain_str = self.chain.to_string();
489 if chain_str != "mainnet" {
490 cmd.args(["--chain", &chain_str]);
491 }
492
493 if let Some(ref datadir) = self.datadir {
495 cmd.args(["--datadir", datadir]);
496 }
497
498 cmd.args(["to-block", &block_number.to_string()]);
499
500 cmd.env("RUST_LOG_STYLE", "never");
502
503 debug!("Executing reth unwind command: {:?}", cmd);
505
506 let mut child = cmd
507 .stdout(std::process::Stdio::piped())
508 .stderr(std::process::Stdio::piped())
509 .spawn()
510 .wrap_err("Failed to start unwind command")?;
511
512 if let Some(stdout) = child.stdout.take() {
514 tokio::spawn(async move {
515 let reader = AsyncBufReader::new(stdout);
516 let mut lines = reader.lines();
517 while let Ok(Some(line)) = lines.next_line().await {
518 debug!("[RETH-UNWIND] {}", line);
519 }
520 });
521 }
522
523 if let Some(stderr) = child.stderr.take() {
524 tokio::spawn(async move {
525 let reader = AsyncBufReader::new(stderr);
526 let mut lines = reader.lines();
527 while let Ok(Some(line)) = lines.next_line().await {
528 debug!("[RETH-UNWIND] {}", line);
529 }
530 });
531 }
532
533 let status = child.wait().await.wrap_err("Failed to wait for unwind command")?;
535
536 if !status.success() {
537 return Err(eyre!("Unwind command failed with exit code: {:?}", status.code()));
538 }
539
540 info!("Unwound to block: {}", block_number);
541 Ok(())
542 }
543}