Skip to main content

reth_tracing_otlp/
lib.rs

1#![cfg(feature = "otlp")]
2
3//! Provides tracing layers for `OpenTelemetry` that export spans, logs, and metrics to an OTLP
4//! endpoint.
5//!
6//! This module simplifies the integration of `OpenTelemetry` with OTLP export in Rust
7//! applications. It allows for easily capturing and exporting distributed traces, logs,
8//! and metrics to compatible backends like Jaeger, Zipkin, or any other
9//! OpenTelemetry-compatible system.
10
11use clap::ValueEnum;
12use eyre::ensure;
13use opentelemetry::{global, trace::TracerProvider, KeyValue, Value};
14use opentelemetry_otlp::{SpanExporter, WithExportConfig};
15use opentelemetry_sdk::{
16    propagation::TraceContextPropagator,
17    trace::{Sampler, SdkTracer, SdkTracerProvider},
18    Resource,
19};
20use opentelemetry_semantic_conventions::{attribute::SERVICE_VERSION, SCHEMA_URL};
21use tracing::Subscriber;
22use tracing_opentelemetry::OpenTelemetryLayer;
23use tracing_subscriber::registry::LookupSpan;
24use url::Url;
25
26use base64::{prelude::BASE64_STANDARD, Engine};
27
28// Otlp http endpoint is expected to end with this path.
29// See also <https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_traces_endpoint>.
30const HTTP_TRACE_ENDPOINT: &str = "/v1/traces";
31const HTTP_LOGS_ENDPOINT: &str = "/v1/logs";
32const OTEL_EXPORTER_OTLP_TRACES_HEADERS: &str = "OTEL_EXPORTER_OTLP_TRACES_HEADERS";
33const OTEL_EXPORTER_OTLP_LOGS_HEADERS: &str = "OTEL_EXPORTER_OTLP_LOGS_HEADERS";
34
35/// Creates a tracing [`OpenTelemetryLayer`] that exports spans to an OTLP endpoint.
36///
37/// This layer can be added to a [`tracing_subscriber::Registry`] to enable `OpenTelemetry` tracing
38/// with OTLP export to an url.
39pub fn span_layer<S>(otlp_config: OtlpConfig) -> eyre::Result<OpenTelemetryLayer<S, SdkTracer>>
40where
41    for<'span> S: Subscriber + LookupSpan<'span>,
42{
43    global::set_text_map_propagator(TraceContextPropagator::new());
44
45    let resource =
46        build_resource(otlp_config.service_name.clone(), otlp_config.service_version.as_deref());
47
48    let span_builder = SpanExporter::builder();
49
50    let span_exporter = match otlp_config.protocol {
51        OtlpProtocol::Http => {
52            span_builder.with_http().with_endpoint(otlp_config.endpoint.as_str()).build()?
53        }
54        OtlpProtocol::Grpc => {
55            span_builder.with_tonic().with_endpoint(otlp_config.endpoint.as_str()).build()?
56        }
57    };
58
59    let sampler = build_sampler(otlp_config.sample_ratio)?;
60
61    let tracer_provider = SdkTracerProvider::builder()
62        .with_resource(resource)
63        .with_sampler(sampler)
64        .with_batch_exporter(span_exporter)
65        .build();
66
67    global::set_tracer_provider(tracer_provider.clone());
68
69    let tracer = tracer_provider.tracer(otlp_config.service_name);
70    Ok(tracing_opentelemetry::layer()
71        .with_tracer(tracer)
72        .with_location(false)
73        .with_tracked_inactivity(false)
74        .with_target(false)
75        .with_threads(false))
76}
77
78/// Creates a tracing layer that exports logs to an OTLP endpoint.
79///
80/// This layer bridges logs emitted via the `tracing` crate to `OpenTelemetry` logs.
81#[cfg(feature = "otlp-logs")]
82pub fn log_layer(
83    otlp_config: OtlpLogsConfig,
84) -> eyre::Result<
85    opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge<
86        opentelemetry_sdk::logs::SdkLoggerProvider,
87        opentelemetry_sdk::logs::SdkLogger,
88    >,
89> {
90    use opentelemetry_otlp::LogExporter;
91    use opentelemetry_sdk::logs::SdkLoggerProvider;
92
93    let resource =
94        build_resource(otlp_config.service_name.clone(), otlp_config.service_version.as_deref());
95
96    let log_builder = LogExporter::builder();
97
98    let log_exporter = match otlp_config.protocol {
99        OtlpProtocol::Http => {
100            log_builder.with_http().with_endpoint(otlp_config.endpoint.as_str()).build()?
101        }
102        OtlpProtocol::Grpc => {
103            log_builder.with_tonic().with_endpoint(otlp_config.endpoint.as_str()).build()?
104        }
105    };
106
107    let logger_provider = SdkLoggerProvider::builder()
108        .with_resource(resource)
109        .with_batch_exporter(log_exporter)
110        .build();
111
112    Ok(opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&logger_provider))
113}
114
115/// Configuration for OTLP trace export.
116#[derive(Debug, Clone)]
117pub struct OtlpConfig {
118    /// Service name for trace identification
119    service_name: String,
120    /// Optional service version override. Falls back to `CARGO_PKG_VERSION` if `None`.
121    service_version: Option<String>,
122    /// Otlp endpoint URL
123    endpoint: Url,
124    /// Transport protocol, HTTP or gRPC
125    protocol: OtlpProtocol,
126    /// Optional sampling ratio, from 0.0 to 1.0
127    sample_ratio: Option<f64>,
128}
129
130impl OtlpConfig {
131    /// Creates a new OTLP configuration.
132    pub fn new(
133        service_name: impl Into<String>,
134        endpoint: Url,
135        protocol: OtlpProtocol,
136        sample_ratio: Option<f64>,
137    ) -> eyre::Result<Self> {
138        if let Some(ratio) = sample_ratio {
139            ensure!(
140                (0.0..=1.0).contains(&ratio),
141                "Sample ratio must be between 0.0 and 1.0, got: {}",
142                ratio
143            );
144        }
145
146        set_otlp_auth_header_from_endpoint(&endpoint, OTEL_EXPORTER_OTLP_TRACES_HEADERS)?;
147        Ok(Self {
148            service_name: service_name.into(),
149            service_version: None,
150            endpoint: endpoint_without_credentials(endpoint),
151            protocol,
152            sample_ratio,
153        })
154    }
155
156    /// Sets the service version for OTLP resource identification.
157    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
158        self.service_version = Some(version.into());
159        self
160    }
161
162    /// Returns the service name.
163    pub fn service_name(&self) -> &str {
164        &self.service_name
165    }
166
167    /// Returns the OTLP endpoint URL.
168    pub const fn endpoint(&self) -> &Url {
169        &self.endpoint
170    }
171
172    /// Returns the transport protocol.
173    pub const fn protocol(&self) -> OtlpProtocol {
174        self.protocol
175    }
176
177    /// Returns the sampling ratio.
178    pub const fn sample_ratio(&self) -> Option<f64> {
179        self.sample_ratio
180    }
181}
182
183/// Configuration for OTLP logs export.
184#[derive(Debug, Clone)]
185pub struct OtlpLogsConfig {
186    /// Service name for log identification
187    service_name: String,
188    /// Optional service version override. Falls back to `CARGO_PKG_VERSION` if `None`.
189    service_version: Option<String>,
190    /// Otlp endpoint URL
191    endpoint: Url,
192    /// Transport protocol, HTTP or gRPC
193    protocol: OtlpProtocol,
194}
195
196impl OtlpLogsConfig {
197    /// Creates a new OTLP logs configuration.
198    pub fn new(
199        service_name: impl Into<String>,
200        endpoint: Url,
201        protocol: OtlpProtocol,
202    ) -> eyre::Result<Self> {
203        set_otlp_auth_header_from_endpoint(&endpoint, OTEL_EXPORTER_OTLP_LOGS_HEADERS)?;
204        Ok(Self {
205            service_name: service_name.into(),
206            service_version: None,
207            endpoint: endpoint_without_credentials(endpoint),
208            protocol,
209        })
210    }
211
212    /// Sets the service version for OTLP resource identification.
213    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
214        self.service_version = Some(version.into());
215        self
216    }
217
218    /// Returns the service name.
219    pub fn service_name(&self) -> &str {
220        &self.service_name
221    }
222
223    /// Returns the OTLP endpoint URL.
224    pub const fn endpoint(&self) -> &Url {
225        &self.endpoint
226    }
227
228    /// Returns the transport protocol.
229    pub const fn protocol(&self) -> OtlpProtocol {
230        self.protocol
231    }
232}
233
234// Builds OTLP resource with service information.
235fn build_resource(service_name: impl Into<Value>, service_version: Option<&str>) -> Resource {
236    let version = service_version.unwrap_or(env!("CARGO_PKG_VERSION"));
237    Resource::builder()
238        .with_service_name(service_name)
239        .with_schema_url([KeyValue::new(SERVICE_VERSION, version.to_string())], SCHEMA_URL)
240        .build()
241}
242
243/// Returns an OTLP endpoint URL with username and password removed.
244fn endpoint_without_credentials(mut endpoint: Url) -> Url {
245    if !endpoint.username().is_empty() {
246        endpoint.set_username("").ok();
247    }
248    if endpoint.password().is_some() {
249        endpoint.set_password(None).ok();
250    }
251    endpoint
252}
253
254/// Builds an OTLP `Authorization` header from endpoint `username:password` credentials.
255fn otlp_auth_header_from_endpoint(endpoint: &Url) -> eyre::Result<Option<String>> {
256    let username = endpoint.username();
257    if username.is_empty() && endpoint.password().is_none() {
258        return Ok(None);
259    }
260
261    let Some(password) = endpoint.password() else {
262        eyre::bail!("OTLP endpoint credentials must include both username and password");
263    };
264    if username.is_empty() {
265        eyre::bail!("OTLP endpoint credentials must include both username and password");
266    }
267
268    let credentials = format!("{username}:{password}");
269    let encoded = BASE64_STANDARD.encode(credentials.as_bytes());
270    Ok(Some(format!("Authorization=Basic {encoded}")))
271}
272
273/// Sets the OTLP signal-specific header env var from endpoint credentials if it is unset.
274fn set_otlp_auth_header_from_endpoint(endpoint: &Url, header_env: &str) -> eyre::Result<()> {
275    if std::env::var_os(header_env).is_some() {
276        return Ok(());
277    }
278
279    let Some(auth_header) = otlp_auth_header_from_endpoint(endpoint)? else {
280        return Ok(());
281    };
282
283    // SAFETY: OTLP exporters are initialized during process startup before exporter worker threads
284    // are spawned.
285    unsafe {
286        std::env::set_var(header_env, auth_header);
287    }
288
289    Ok(())
290}
291
292/// Builds the appropriate sampler based on the sample ratio.
293fn build_sampler(sample_ratio: Option<f64>) -> eyre::Result<Sampler> {
294    match sample_ratio {
295        // Default behavior: sample all traces
296        None | Some(1.0) => Ok(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
297        // Don't sample anything
298        Some(0.0) => Ok(Sampler::ParentBased(Box::new(Sampler::AlwaysOff))),
299        // Sample based on trace ID ratio
300        Some(ratio) => Ok(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(ratio)))),
301    }
302}
303
304/// OTLP transport protocol type
305#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
306pub enum OtlpProtocol {
307    /// HTTP/Protobuf transport, port 4318, requires `/v1/traces` path
308    Http,
309    /// gRPC transport, port 4317
310    Grpc,
311}
312
313impl OtlpProtocol {
314    /// Validate and correct the URL to match protocol requirements for traces.
315    ///
316    /// For HTTP: Ensures the path ends with `/v1/traces`, appending it if necessary.
317    /// For gRPC: Ensures the path does NOT include `/v1/traces`.
318    pub fn validate_endpoint(&self, url: &mut Url) -> eyre::Result<()> {
319        self.validate_endpoint_with_path(url, HTTP_TRACE_ENDPOINT)
320    }
321
322    /// Validate and correct the URL to match protocol requirements for logs.
323    ///
324    /// For HTTP: Ensures the path ends with `/v1/logs`, appending it if necessary.
325    /// For gRPC: Ensures the path does NOT include `/v1/logs`.
326    pub fn validate_logs_endpoint(&self, url: &mut Url) -> eyre::Result<()> {
327        self.validate_endpoint_with_path(url, HTTP_LOGS_ENDPOINT)
328    }
329
330    fn validate_endpoint_with_path(&self, url: &mut Url, http_path: &str) -> eyre::Result<()> {
331        match self {
332            Self::Http => {
333                if !url.path().ends_with(http_path) {
334                    let path = url.path().trim_end_matches('/');
335                    url.set_path(&format!("{}{}", path, http_path));
336                }
337            }
338            Self::Grpc => {
339                ensure!(
340                    !url.path().ends_with(http_path),
341                    "OTLP gRPC endpoint should not include {} path, got: {}",
342                    http_path,
343                    url
344                );
345            }
346        }
347        Ok(())
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn strips_credentials_from_trace_endpoint_and_preserves_auth_header() {
357        let config = OtlpConfig::new(
358            "reth",
359            "https://user:pass@example.com/v1/traces".parse().unwrap(),
360            OtlpProtocol::Http,
361            None,
362        )
363        .unwrap();
364
365        assert_eq!(config.endpoint.as_str(), "https://example.com/v1/traces");
366        assert_eq!(
367            otlp_auth_header_from_endpoint(
368                &"https://user:pass@example.com/v1/traces".parse().unwrap()
369            )
370            .unwrap()
371            .as_deref(),
372            Some("Authorization=Basic dXNlcjpwYXNz")
373        );
374    }
375
376    #[test]
377    fn strips_credentials_from_logs_endpoint_and_preserves_auth_header() {
378        let config = OtlpLogsConfig::new(
379            "reth",
380            "https://logs:secret@example.com/v1/logs".parse().unwrap(),
381            OtlpProtocol::Http,
382        )
383        .unwrap();
384
385        assert_eq!(config.endpoint.as_str(), "https://example.com/v1/logs");
386        assert_eq!(
387            otlp_auth_header_from_endpoint(
388                &"https://logs:secret@example.com/v1/logs".parse().unwrap()
389            )
390            .unwrap()
391            .as_deref(),
392            Some("Authorization=Basic bG9nczpzZWNyZXQ=")
393        );
394    }
395
396    #[test]
397    fn leaves_endpoint_without_credentials_unchanged() {
398        let config = OtlpConfig::new(
399            "reth",
400            "https://example.com/v1/traces".parse().unwrap(),
401            OtlpProtocol::Http,
402            None,
403        )
404        .unwrap();
405
406        assert_eq!(config.endpoint.as_str(), "https://example.com/v1/traces");
407        assert_eq!(otlp_auth_header_from_endpoint(config.endpoint()).unwrap(), None);
408    }
409
410    #[test]
411    fn rejects_partial_credentials() {
412        assert!(OtlpConfig::new(
413            "reth",
414            "https://user@example.com/v1/traces".parse().unwrap(),
415            OtlpProtocol::Http,
416            None,
417        )
418        .is_err());
419    }
420}