stratus/infra/tracing/
tracing_config.rs

1use std::collections::HashMap;
2use std::io::IsTerminal;
3use std::io::stdout;
4use std::str::FromStr;
5
6use anyhow::anyhow;
7use clap::Parser;
8use display_json::DebugAsJson;
9use http::HeaderMap;
10use http::header::HeaderName;
11use http::header::HeaderValue;
12use itertools::Itertools;
13use opentelemetry::KeyValue;
14use opentelemetry::trace::TracerProvider;
15use opentelemetry_otlp::Protocol;
16use opentelemetry_otlp::WithExportConfig;
17use opentelemetry_otlp::WithHttpConfig;
18use opentelemetry_otlp::WithTonicConfig;
19use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
20use opentelemetry_sdk::Resource as SdkResource;
21use opentelemetry_sdk::trace::BatchConfigBuilder;
22use opentelemetry_sdk::trace::SdkTracerProvider;
23use opentelemetry_sdk::trace::Tracer as SdkTracer;
24use tracing_subscriber::EnvFilter;
25use tracing_subscriber::Layer;
26use tracing_subscriber::filter::LevelFilter;
27use tracing_subscriber::fmt;
28use tracing_subscriber::layer::SubscriberExt;
29use tracing_subscriber::util::SubscriberInitExt;
30
31use crate::infra::build_info;
32use crate::infra::sentry::SentryConfig;
33use crate::infra::tracing::TracingContextLayer;
34use crate::infra::tracing::TracingJsonFormatter;
35use crate::infra::tracing::TracingMinimalTimer;
36
37// -----------------------------------------------------------------------------
38// Config
39// -----------------------------------------------------------------------------
40
41#[derive(DebugAsJson, Clone, Parser, serde::Serialize)]
42pub struct TracingConfig {
43    /// OpenTelemetry server URL.
44    #[arg(long = "tracing-url", alias = "tracing-collector-url", env = "TRACING_URL")]
45    pub tracing_url: Option<String>,
46
47    /// OpenTelemetry server communication protocol.
48    #[arg(long = "tracing-protocol", env = "TRACING_PROTOCOL", default_value = "grpc")]
49    pub tracing_protocol: TracingProtocol,
50
51    /// OpenTelemetry additional HTTP headers or GRPC metadata.
52    #[arg(long = "tracing-headers", env = "TRACING_HEADERS", value_delimiter = ',')]
53    pub tracing_headers: Vec<String>,
54
55    /// How tracing events will be formatted when displayed in stdout.
56    #[arg(long = "tracing-log-format", env = "TRACING_LOG_FORMAT", default_value = "normal")]
57    pub tracing_log_format: TracingLogFormat,
58}
59
60impl TracingConfig {
61    /// Inits application global tracing registry.
62    ///
63    /// Uses println! to have information available in stdout before tracing is initialized.
64    pub fn init(&self, sentry_config: &Option<SentryConfig>) -> anyhow::Result<()> {
65        match self.create_subscriber(sentry_config).try_init() {
66            Ok(()) => Ok(()),
67            Err(e) => {
68                println!("failed to create tracing registry | reason={e:?}");
69                Err(e.into())
70            }
71        }
72    }
73    pub fn create_subscriber(&self, sentry_config: &Option<SentryConfig>) -> impl SubscriberInitExt {
74        println!("creating tracing registry");
75
76        // configure tracing context layer
77        println!("tracing registry: enabling tracing context recorder");
78        let tracing_context_layer = TracingContextLayer.with_filter(EnvFilter::from_default_env());
79
80        // configure stdout log layer
81        let enable_ansi = stdout().is_terminal();
82        println!(
83            "tracing registry: enabling console logs | format={} ansi={}",
84            self.tracing_log_format, enable_ansi
85        );
86        let stdout_layer = match self.tracing_log_format {
87            TracingLogFormat::Json => fmt::Layer::default()
88                .event_format(TracingJsonFormatter)
89                .with_filter(EnvFilter::from_default_env())
90                .boxed(),
91            TracingLogFormat::Minimal => fmt::Layer::default()
92                .with_thread_ids(false)
93                .with_thread_names(false)
94                .with_target(false)
95                .with_ansi(enable_ansi)
96                .with_timer(TracingMinimalTimer)
97                .with_filter(EnvFilter::from_default_env())
98                .boxed(),
99            TracingLogFormat::Normal => fmt::Layer::default().with_ansi(enable_ansi).with_filter(EnvFilter::from_default_env()).boxed(),
100            TracingLogFormat::Verbose => fmt::Layer::default()
101                .with_ansi(enable_ansi)
102                .with_target(true)
103                .with_thread_ids(true)
104                .with_thread_names(true)
105                .with_filter(EnvFilter::from_default_env())
106                .boxed(),
107        };
108
109        // configure opentelemetry layer
110        let opentelemetry_layer = match &self.tracing_url {
111            Some(url) => {
112                let tracer = opentelemetry_tracer(url, self.tracing_protocol, &self.tracing_headers);
113                let layer = tracing_opentelemetry::layer()
114                    .with_tracked_inactivity(false)
115                    .with_tracer(tracer)
116                    .with_filter(EnvFilter::from_default_env());
117                Some(layer)
118            }
119            None => {
120                println!("tracing registry: skipping opentelemetry exporter");
121                None
122            }
123        };
124
125        let sentry_layer = match &sentry_config {
126            Some(sentry_config) => {
127                println!("tracing registry: enabling sentry exporter | url={}", sentry_config.sentry_url);
128                let layer = sentry_tracing::layer()
129                    .event_filter(crate::infra::sentry::sentry_event_filter)
130                    .span_filter(crate::infra::sentry::sentry_span_filter)
131                    .with_filter(LevelFilter::ERROR)
132                    .with_filter(EnvFilter::from_default_env());
133                Some(layer)
134            }
135            None => {
136                println!("tracing registry: skipping sentry exporter");
137                None
138            }
139        };
140
141        tracing_subscriber::registry()
142            .with(tracing_context_layer)
143            .with(stdout_layer)
144            .with(opentelemetry_layer)
145            .with(sentry_layer)
146    }
147}
148
149fn opentelemetry_tracer(url: &str, protocol: TracingProtocol, headers: &[String]) -> SdkTracer {
150    println!(
151        "tracing registry: enabling opentelemetry exporter | url={} protocol={} headers={} service={}",
152        url,
153        protocol,
154        headers.len(),
155        build_info::service_name()
156    );
157
158    // configure headers
159    let headers = headers
160        .iter()
161        .map(|header| {
162            let mut parts = header.splitn(2, '=');
163            let key = parts.next().unwrap();
164            let value = parts.next().unwrap_or_default();
165            (key, value)
166        })
167        .collect_vec();
168
169    let resource = SdkResource::builder()
170        .with_attributes([KeyValue::new("service.name", build_info::service_name())])
171        .build();
172
173    // configure tracer
174    match protocol {
175        TracingProtocol::Grpc => {
176            let mut http_metadata = HeaderMap::new();
177            for (key, value) in headers {
178                let header_name = HeaderName::from_str(key).unwrap();
179                let header_value = HeaderValue::from_str(value).unwrap();
180                http_metadata.insert(header_name, header_value);
181            }
182
183            let protocol_metadata = MetadataMap::from_headers(http_metadata);
184
185            let exporter = opentelemetry_otlp::SpanExporter::builder()
186                .with_tonic()
187                .with_endpoint(url)
188                .with_metadata(protocol_metadata)
189                .build()
190                .unwrap();
191
192            let batch_config = BatchConfigBuilder::default().with_max_queue_size(u16::MAX as usize).build();
193            let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
194                .with_batch_config(batch_config)
195                .build();
196
197            SdkTracerProvider::builder()
198                .with_resource(resource.clone())
199                .with_span_processor(batch_processor)
200                .build()
201                .tracer(build_info::binary_name())
202        }
203        TracingProtocol::HttpBinary | TracingProtocol::HttpJson => {
204            let mut protocol_headers = HashMap::new();
205            for (key, value) in headers {
206                protocol_headers.insert(key.to_owned(), value.to_owned());
207            }
208
209            let exporter = opentelemetry_otlp::SpanExporter::builder()
210                .with_http()
211                .with_endpoint(url)
212                .with_headers(protocol_headers)
213                .build()
214                .unwrap();
215
216            let batch_config = BatchConfigBuilder::default().with_max_queue_size(u16::MAX as usize).build();
217            let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
218                .with_batch_config(batch_config)
219                .build();
220
221            SdkTracerProvider::builder()
222                .with_resource(resource.clone())
223                .with_span_processor(batch_processor)
224                .build()
225                .tracer(build_info::binary_name())
226        }
227    }
228}
229
230// -----------------------------------------------------------------------------
231// Protocol
232// -----------------------------------------------------------------------------
233
234#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)]
235pub enum TracingProtocol {
236    #[serde(rename = "grpc")]
237    #[strum(to_string = "grpc")]
238    Grpc,
239
240    #[serde(rename = "http-binary")]
241    #[strum(to_string = "http-binary")]
242    HttpBinary,
243
244    #[serde(rename = "http-json")]
245    #[strum(to_string = "http-json")]
246    HttpJson,
247}
248
249impl FromStr for TracingProtocol {
250    type Err = anyhow::Error;
251
252    fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
253        match s.to_lowercase().trim() {
254            "grpc" => Ok(Self::Grpc),
255            "http-binary" => Ok(Self::HttpBinary),
256            "http-json" => Ok(Self::HttpJson),
257            s => Err(anyhow!("unknown tracing protocol: {s}")),
258        }
259    }
260}
261
262impl From<TracingProtocol> for Protocol {
263    fn from(value: TracingProtocol) -> Self {
264        match value {
265            TracingProtocol::Grpc => Self::Grpc,
266            TracingProtocol::HttpBinary => Self::HttpBinary,
267            TracingProtocol::HttpJson => Self::HttpJson,
268        }
269    }
270}
271
272// -----------------------------------------------------------------------------
273// LogFormat
274// -----------------------------------------------------------------------------
275
276/// Tracing event log format.
277#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)]
278pub enum TracingLogFormat {
279    /// Minimal format: Time (no date), level, and message.
280    #[serde(rename = "minimal")]
281    #[strum(to_string = "minimal")]
282    Minimal,
283
284    /// Normal format: Default `tracing` crate configuration.
285    #[serde(rename = "normal")]
286    #[strum(to_string = "normal")]
287    Normal,
288
289    /// Verbose format: Full datetime, level, thread, target, and message.
290    #[serde(rename = "verbose")]
291    #[strum(to_string = "verbose")]
292    Verbose,
293
294    /// JSON format: Verbose information formatted as JSON.
295    #[serde(rename = "json")]
296    #[strum(to_string = "json")]
297    Json,
298}
299
300impl FromStr for TracingLogFormat {
301    type Err = anyhow::Error;
302
303    fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
304        match s.to_lowercase().trim() {
305            "json" => Ok(Self::Json),
306            "minimal" => Ok(Self::Minimal),
307            "normal" => Ok(Self::Normal),
308            "verbose" | "full" => Ok(Self::Verbose),
309            s => Err(anyhow!("unknown log format: {s}")),
310        }
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_tracing_config_with_json_format() {
320        let config = TracingConfig {
321            tracing_url: None,
322            tracing_protocol: TracingProtocol::Grpc,
323            tracing_headers: vec![],
324            tracing_log_format: TracingLogFormat::Json,
325        };
326        config.create_subscriber(&None);
327    }
328
329    #[test]
330    fn test_tracing_config_with_minimal_format() {
331        let config = TracingConfig {
332            tracing_url: None,
333            tracing_protocol: TracingProtocol::Grpc,
334            tracing_headers: vec![],
335            tracing_log_format: TracingLogFormat::Minimal,
336        };
337        config.create_subscriber(&None);
338    }
339
340    #[test]
341    fn test_tracing_config_with_normal_format() {
342        let config = TracingConfig {
343            tracing_url: None,
344            tracing_protocol: TracingProtocol::Grpc,
345            tracing_headers: vec![],
346            tracing_log_format: TracingLogFormat::Normal,
347        };
348        config.create_subscriber(&None);
349    }
350
351    #[test]
352    fn test_tracing_config_with_verbose_format() {
353        let config = TracingConfig {
354            tracing_url: None,
355            tracing_protocol: TracingProtocol::Grpc,
356            tracing_headers: vec![],
357            tracing_log_format: TracingLogFormat::Verbose,
358        };
359        config.create_subscriber(&None);
360    }
361
362    #[tokio::test]
363    async fn test_tracing_config_with_opentelemetry() {
364        let config = TracingConfig {
365            tracing_url: Some("http://localhost:4317".to_string()),
366            tracing_protocol: TracingProtocol::Grpc,
367            tracing_headers: vec![],
368            tracing_log_format: TracingLogFormat::Normal,
369        };
370        config.create_subscriber(&None);
371    }
372
373    #[test]
374    fn test_tracing_config_with_sentry() {
375        let sentry_config = SentryConfig {
376            sentry_url: "http://localhost:1234".to_string(),
377        };
378        let config = TracingConfig {
379            tracing_url: None,
380            tracing_protocol: TracingProtocol::Grpc,
381            tracing_headers: vec![],
382            tracing_log_format: TracingLogFormat::Normal,
383        };
384        config.create_subscriber(&Some(sentry_config));
385    }
386
387    #[tokio::test]
388    async fn test_tracing_config_with_tokio_console() {
389        let config = TracingConfig {
390            tracing_url: None,
391            tracing_protocol: TracingProtocol::Grpc,
392            tracing_headers: vec![],
393            tracing_log_format: TracingLogFormat::Normal,
394        };
395        config.create_subscriber(&None);
396    }
397
398    #[test]
399    fn test_tracing_protocol_from_str() {
400        assert_eq!(TracingProtocol::from_str("grpc").unwrap(), TracingProtocol::Grpc);
401        assert_eq!(TracingProtocol::from_str("http-binary").unwrap(), TracingProtocol::HttpBinary);
402        assert_eq!(TracingProtocol::from_str("http-json").unwrap(), TracingProtocol::HttpJson);
403        assert!(TracingProtocol::from_str("invalid").is_err());
404    }
405
406    #[test]
407    fn test_tracing_protocol_display() {
408        assert_eq!(TracingProtocol::Grpc.to_string(), "grpc");
409        assert_eq!(TracingProtocol::HttpBinary.to_string(), "http-binary");
410        assert_eq!(TracingProtocol::HttpJson.to_string(), "http-json");
411    }
412
413    #[test]
414    fn test_tracing_protocol_into_protocol() {
415        assert_eq!(Protocol::from(TracingProtocol::Grpc), Protocol::Grpc);
416        assert_eq!(Protocol::from(TracingProtocol::HttpBinary), Protocol::HttpBinary);
417        assert_eq!(Protocol::from(TracingProtocol::HttpJson), Protocol::HttpJson);
418    }
419}