1#![cfg(feature = "otlp")]
2
3use clap::ValueEnum;
12use eyre::ensure;
13use opentelemetry::{global, trace::TracerProvider, KeyValue, Value};
14use opentelemetry_otlp::{SpanExporter, WithExportConfig};
15use opentelemetry_sdk::{
16 propagation::TraceContextPropagator,
17 trace::{Sampler, SdkTracer, SdkTracerProvider},
18 Resource,
19};
20use opentelemetry_semantic_conventions::{attribute::SERVICE_VERSION, SCHEMA_URL};
21use tracing::Subscriber;
22use tracing_opentelemetry::OpenTelemetryLayer;
23use tracing_subscriber::registry::LookupSpan;
24use url::Url;
25
26use base64::{prelude::BASE64_STANDARD, Engine};
27
28const HTTP_TRACE_ENDPOINT: &str = "/v1/traces";
31const HTTP_LOGS_ENDPOINT: &str = "/v1/logs";
32const OTEL_EXPORTER_OTLP_TRACES_HEADERS: &str = "OTEL_EXPORTER_OTLP_TRACES_HEADERS";
33const OTEL_EXPORTER_OTLP_LOGS_HEADERS: &str = "OTEL_EXPORTER_OTLP_LOGS_HEADERS";
34
35pub fn span_layer<S>(otlp_config: OtlpConfig) -> eyre::Result<OpenTelemetryLayer<S, SdkTracer>>
40where
41 for<'span> S: Subscriber + LookupSpan<'span>,
42{
43 global::set_text_map_propagator(TraceContextPropagator::new());
44
45 let resource =
46 build_resource(otlp_config.service_name.clone(), otlp_config.service_version.as_deref());
47
48 let span_builder = SpanExporter::builder();
49
50 let span_exporter = match otlp_config.protocol {
51 OtlpProtocol::Http => {
52 span_builder.with_http().with_endpoint(otlp_config.endpoint.as_str()).build()?
53 }
54 OtlpProtocol::Grpc => {
55 span_builder.with_tonic().with_endpoint(otlp_config.endpoint.as_str()).build()?
56 }
57 };
58
59 let sampler = build_sampler(otlp_config.sample_ratio)?;
60
61 let tracer_provider = SdkTracerProvider::builder()
62 .with_resource(resource)
63 .with_sampler(sampler)
64 .with_batch_exporter(span_exporter)
65 .build();
66
67 global::set_tracer_provider(tracer_provider.clone());
68
69 let tracer = tracer_provider.tracer(otlp_config.service_name);
70 Ok(tracing_opentelemetry::layer()
71 .with_tracer(tracer)
72 .with_location(false)
73 .with_tracked_inactivity(false)
74 .with_target(false)
75 .with_threads(false))
76}
77
78#[cfg(feature = "otlp-logs")]
82pub fn log_layer(
83 otlp_config: OtlpLogsConfig,
84) -> eyre::Result<
85 opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge<
86 opentelemetry_sdk::logs::SdkLoggerProvider,
87 opentelemetry_sdk::logs::SdkLogger,
88 >,
89> {
90 use opentelemetry_otlp::LogExporter;
91 use opentelemetry_sdk::logs::SdkLoggerProvider;
92
93 let resource =
94 build_resource(otlp_config.service_name.clone(), otlp_config.service_version.as_deref());
95
96 let log_builder = LogExporter::builder();
97
98 let log_exporter = match otlp_config.protocol {
99 OtlpProtocol::Http => {
100 log_builder.with_http().with_endpoint(otlp_config.endpoint.as_str()).build()?
101 }
102 OtlpProtocol::Grpc => {
103 log_builder.with_tonic().with_endpoint(otlp_config.endpoint.as_str()).build()?
104 }
105 };
106
107 let logger_provider = SdkLoggerProvider::builder()
108 .with_resource(resource)
109 .with_batch_exporter(log_exporter)
110 .build();
111
112 Ok(opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&logger_provider))
113}
114
115#[derive(Debug, Clone)]
117pub struct OtlpConfig {
118 service_name: String,
120 service_version: Option<String>,
122 endpoint: Url,
124 protocol: OtlpProtocol,
126 sample_ratio: Option<f64>,
128}
129
130impl OtlpConfig {
131 pub fn new(
133 service_name: impl Into<String>,
134 endpoint: Url,
135 protocol: OtlpProtocol,
136 sample_ratio: Option<f64>,
137 ) -> eyre::Result<Self> {
138 if let Some(ratio) = sample_ratio {
139 ensure!(
140 (0.0..=1.0).contains(&ratio),
141 "Sample ratio must be between 0.0 and 1.0, got: {}",
142 ratio
143 );
144 }
145
146 set_otlp_auth_header_from_endpoint(&endpoint, OTEL_EXPORTER_OTLP_TRACES_HEADERS)?;
147 Ok(Self {
148 service_name: service_name.into(),
149 service_version: None,
150 endpoint: endpoint_without_credentials(endpoint),
151 protocol,
152 sample_ratio,
153 })
154 }
155
156 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
158 self.service_version = Some(version.into());
159 self
160 }
161
162 pub fn service_name(&self) -> &str {
164 &self.service_name
165 }
166
167 pub const fn endpoint(&self) -> &Url {
169 &self.endpoint
170 }
171
172 pub const fn protocol(&self) -> OtlpProtocol {
174 self.protocol
175 }
176
177 pub const fn sample_ratio(&self) -> Option<f64> {
179 self.sample_ratio
180 }
181}
182
183#[derive(Debug, Clone)]
185pub struct OtlpLogsConfig {
186 service_name: String,
188 service_version: Option<String>,
190 endpoint: Url,
192 protocol: OtlpProtocol,
194}
195
196impl OtlpLogsConfig {
197 pub fn new(
199 service_name: impl Into<String>,
200 endpoint: Url,
201 protocol: OtlpProtocol,
202 ) -> eyre::Result<Self> {
203 set_otlp_auth_header_from_endpoint(&endpoint, OTEL_EXPORTER_OTLP_LOGS_HEADERS)?;
204 Ok(Self {
205 service_name: service_name.into(),
206 service_version: None,
207 endpoint: endpoint_without_credentials(endpoint),
208 protocol,
209 })
210 }
211
212 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
214 self.service_version = Some(version.into());
215 self
216 }
217
218 pub fn service_name(&self) -> &str {
220 &self.service_name
221 }
222
223 pub const fn endpoint(&self) -> &Url {
225 &self.endpoint
226 }
227
228 pub const fn protocol(&self) -> OtlpProtocol {
230 self.protocol
231 }
232}
233
234fn build_resource(service_name: impl Into<Value>, service_version: Option<&str>) -> Resource {
236 let version = service_version.unwrap_or(env!("CARGO_PKG_VERSION"));
237 Resource::builder()
238 .with_service_name(service_name)
239 .with_schema_url([KeyValue::new(SERVICE_VERSION, version.to_string())], SCHEMA_URL)
240 .build()
241}
242
243fn endpoint_without_credentials(mut endpoint: Url) -> Url {
245 if !endpoint.username().is_empty() {
246 endpoint.set_username("").ok();
247 }
248 if endpoint.password().is_some() {
249 endpoint.set_password(None).ok();
250 }
251 endpoint
252}
253
254fn otlp_auth_header_from_endpoint(endpoint: &Url) -> eyre::Result<Option<String>> {
256 let username = endpoint.username();
257 if username.is_empty() && endpoint.password().is_none() {
258 return Ok(None);
259 }
260
261 let Some(password) = endpoint.password() else {
262 eyre::bail!("OTLP endpoint credentials must include both username and password");
263 };
264 if username.is_empty() {
265 eyre::bail!("OTLP endpoint credentials must include both username and password");
266 }
267
268 let credentials = format!("{username}:{password}");
269 let encoded = BASE64_STANDARD.encode(credentials.as_bytes());
270 Ok(Some(format!("Authorization=Basic {encoded}")))
271}
272
273fn set_otlp_auth_header_from_endpoint(endpoint: &Url, header_env: &str) -> eyre::Result<()> {
275 if std::env::var_os(header_env).is_some() {
276 return Ok(());
277 }
278
279 let Some(auth_header) = otlp_auth_header_from_endpoint(endpoint)? else {
280 return Ok(());
281 };
282
283 unsafe {
286 std::env::set_var(header_env, auth_header);
287 }
288
289 Ok(())
290}
291
292fn build_sampler(sample_ratio: Option<f64>) -> eyre::Result<Sampler> {
294 match sample_ratio {
295 None | Some(1.0) => Ok(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
297 Some(0.0) => Ok(Sampler::ParentBased(Box::new(Sampler::AlwaysOff))),
299 Some(ratio) => Ok(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(ratio)))),
301 }
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
306pub enum OtlpProtocol {
307 Http,
309 Grpc,
311}
312
313impl OtlpProtocol {
314 pub fn validate_endpoint(&self, url: &mut Url) -> eyre::Result<()> {
319 self.validate_endpoint_with_path(url, HTTP_TRACE_ENDPOINT)
320 }
321
322 pub fn validate_logs_endpoint(&self, url: &mut Url) -> eyre::Result<()> {
327 self.validate_endpoint_with_path(url, HTTP_LOGS_ENDPOINT)
328 }
329
330 fn validate_endpoint_with_path(&self, url: &mut Url, http_path: &str) -> eyre::Result<()> {
331 match self {
332 Self::Http => {
333 if !url.path().ends_with(http_path) {
334 let path = url.path().trim_end_matches('/');
335 url.set_path(&format!("{}{}", path, http_path));
336 }
337 }
338 Self::Grpc => {
339 ensure!(
340 !url.path().ends_with(http_path),
341 "OTLP gRPC endpoint should not include {} path, got: {}",
342 http_path,
343 url
344 );
345 }
346 }
347 Ok(())
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn strips_credentials_from_trace_endpoint_and_preserves_auth_header() {
357 let config = OtlpConfig::new(
358 "reth",
359 "https://user:pass@example.com/v1/traces".parse().unwrap(),
360 OtlpProtocol::Http,
361 None,
362 )
363 .unwrap();
364
365 assert_eq!(config.endpoint.as_str(), "https://example.com/v1/traces");
366 assert_eq!(
367 otlp_auth_header_from_endpoint(
368 &"https://user:pass@example.com/v1/traces".parse().unwrap()
369 )
370 .unwrap()
371 .as_deref(),
372 Some("Authorization=Basic dXNlcjpwYXNz")
373 );
374 }
375
376 #[test]
377 fn strips_credentials_from_logs_endpoint_and_preserves_auth_header() {
378 let config = OtlpLogsConfig::new(
379 "reth",
380 "https://logs:secret@example.com/v1/logs".parse().unwrap(),
381 OtlpProtocol::Http,
382 )
383 .unwrap();
384
385 assert_eq!(config.endpoint.as_str(), "https://example.com/v1/logs");
386 assert_eq!(
387 otlp_auth_header_from_endpoint(
388 &"https://logs:secret@example.com/v1/logs".parse().unwrap()
389 )
390 .unwrap()
391 .as_deref(),
392 Some("Authorization=Basic bG9nczpzZWNyZXQ=")
393 );
394 }
395
396 #[test]
397 fn leaves_endpoint_without_credentials_unchanged() {
398 let config = OtlpConfig::new(
399 "reth",
400 "https://example.com/v1/traces".parse().unwrap(),
401 OtlpProtocol::Http,
402 None,
403 )
404 .unwrap();
405
406 assert_eq!(config.endpoint.as_str(), "https://example.com/v1/traces");
407 assert_eq!(otlp_auth_header_from_endpoint(config.endpoint()).unwrap(), None);
408 }
409
410 #[test]
411 fn rejects_partial_credentials() {
412 assert!(OtlpConfig::new(
413 "reth",
414 "https://user@example.com/v1/traces".parse().unwrap(),
415 OtlpProtocol::Http,
416 None,
417 )
418 .is_err());
419 }
420}