Skip to main content

reth_tasks/
utils.rs

1//! Task utility functions.
2
3pub use thread_priority::{self, *};
4
5/// Runs the given closure exactly once per call site.
6///
7/// Each invocation expands to its own `static Once`, so two `once!` calls in the same function
8/// are independent. Useful for one-time thread setup (priority, affinity) on threads that may
9/// be re-entered (e.g. tokio blocking pool).
10#[macro_export]
11macro_rules! once {
12    ($e:expr) => {{
13        static ONCE: std::sync::Once = std::sync::Once::new();
14        ONCE.call_once($e);
15    }};
16}
17
18/// Increases the current thread's priority.
19///
20/// Tries [`ThreadPriority::Max`] first. If that fails (e.g. missing `CAP_SYS_NICE`),
21/// falls back to a moderate bump via [`ThreadPriority::Crossplatform`] (~5 nice points
22/// on unix). Failures are logged at `debug` level.
23pub fn increase_thread_priority() {
24    let thread_name = std::thread::current().name().unwrap_or("unnamed").to_string();
25    if let Err(err) = ThreadPriority::Max.set_for_current() {
26        tracing::debug!(%thread_name, ?err, "failed to set max thread priority, trying moderate bump; grant CAP_SYS_NICE to the process to enable this");
27        // Crossplatform value 62/99 ≈ nice -5 on unix.
28        let fallback = ThreadPriority::Crossplatform(
29            ThreadPriorityValue::try_from(62u8).expect("62 is within the valid 0..100 range"),
30        );
31        if let Err(err) = fallback.set_for_current() {
32            tracing::debug!(%thread_name, ?err, "failed to set moderate thread priority");
33        }
34    }
35}
36
37/// Deprioritizes known background threads spawned by third-party libraries (`OpenTelemetry`,
38/// `tracing-appender`, `reqwest`) by scanning `/proc/<pid>/task/` for matching thread names and
39/// setting `SCHED_IDLE` scheduling policy + maximum niceness on them.
40///
41/// This is a hack: these threads are spawned by libraries that do not expose a way to hook into
42/// thread initialization or expose the TIDs, so we have to discover them after the fact by
43/// reading `/proc`.
44///
45/// Should be called once after tracing is initialized.
46///
47/// No-op on non-Linux platforms.
48#[allow(clippy::missing_const_for_fn)]
49pub fn deprioritize_background_threads() {
50    #[cfg(target_os = "linux")]
51    _deprioritize_background_threads();
52}
53
54/// Thread name prefixes to deprioritize.
55#[cfg(target_os = "linux")]
56const DEPRIORITIZE_THREAD_PREFIXES: &[&str] =
57    &["OpenTelemetry.T", "tracing-appende", "reqwest-interna"];
58
59#[cfg(target_os = "linux")]
60fn _deprioritize_background_threads() {
61    let pid = std::process::id();
62    let task_dir = format!("/proc/{pid}/task");
63
64    let entries = match std::fs::read_dir(&task_dir) {
65        Ok(entries) => entries,
66        Err(err) => {
67            tracing::debug!(%err, "failed to read /proc task directory");
68            return;
69        }
70    };
71
72    for entry in entries.filter_map(Result::ok) {
73        let tid_str = entry.file_name();
74        let Some(tid_str) = tid_str.to_str() else { continue };
75        let Ok(tid) = tid_str.parse::<i32>() else { continue };
76
77        let comm_path = format!("{task_dir}/{tid_str}/comm");
78        let comm = match std::fs::read_to_string(&comm_path) {
79            Ok(c) => c,
80            Err(_) => continue,
81        };
82        let comm = comm.trim();
83
84        if !DEPRIORITIZE_THREAD_PREFIXES.iter().any(|prefix| comm.starts_with(prefix)) {
85            continue;
86        }
87
88        // SCHED_IDLE is the lowest-priority scheduling class. The kernel will only schedule these
89        // threads when no other (SCHED_OTHER/SCHED_BATCH/RT) threads need the CPU.
90        // SAFETY: sched_setscheduler is safe to call with a valid TID.
91        unsafe {
92            let param = libc::sched_param { sched_priority: 0 };
93            if libc::sched_setscheduler(tid, libc::SCHED_IDLE, std::ptr::from_ref(&param)) != 0 {
94                tracing::debug!(
95                    tid,
96                    comm,
97                    err = std::io::Error::last_os_error().to_string(),
98                    "failed to set SCHED_IDLE"
99                );
100            }
101        }
102
103        tracing::debug!(tid, comm, "deprioritized background thread (SCHED_IDLE)");
104    }
105}