stratus/infra/tracing/
tracing_config.rs

1use std::collections::HashMap;
2use std::io::IsTerminal;
3use std::io::stdout;
4use std::str::FromStr;
5
6use anyhow::anyhow;
7use clap::Parser;
8use display_json::DebugAsJson;
9use itertools::Itertools;
10use opentelemetry::KeyValue;
11use opentelemetry::trace::TracerProvider;
12use opentelemetry_otlp::Protocol;
13use opentelemetry_otlp::WithExportConfig;
14use opentelemetry_otlp::WithHttpConfig;
15use opentelemetry_otlp::WithTonicConfig;
16use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
17use opentelemetry_sdk::Resource as SdkResource;
18use opentelemetry_sdk::trace::BatchConfigBuilder;
19use opentelemetry_sdk::trace::SdkTracerProvider;
20use opentelemetry_sdk::trace::Tracer as SdkTracer;
21use tonic::metadata::MetadataKey;
22use tracing_subscriber::EnvFilter;
23use tracing_subscriber::Layer;
24use tracing_subscriber::fmt;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28use crate::infra::build_info;
29use crate::infra::sentry::SentryConfig;
30use crate::infra::tracing::TracingContextLayer;
31use crate::infra::tracing::TracingJsonFormatter;
32use crate::infra::tracing::TracingMinimalTimer;
33
34// -----------------------------------------------------------------------------
35// Config
36// -----------------------------------------------------------------------------
37
38#[derive(DebugAsJson, Clone, Parser, serde::Serialize)]
39pub struct TracingConfig {
40    /// OpenTelemetry server URL.
41    #[arg(long = "tracing-url", alias = "tracing-collector-url", env = "TRACING_URL")]
42    pub tracing_url: Option<String>,
43
44    /// OpenTelemetry server communication protocol.
45    #[arg(long = "tracing-protocol", env = "TRACING_PROTOCOL", default_value = "grpc")]
46    pub tracing_protocol: TracingProtocol,
47
48    /// OpenTelemetry additional HTTP headers or GRPC metadata.
49    #[arg(long = "tracing-headers", env = "TRACING_HEADERS", value_delimiter = ',')]
50    pub tracing_headers: Vec<String>,
51
52    /// How tracing events will be formatted when displayed in stdout.
53    #[arg(long = "tracing-log-format", env = "TRACING_LOG_FORMAT", default_value = "normal")]
54    pub tracing_log_format: TracingLogFormat,
55}
56
57impl TracingConfig {
58    /// Inits application global tracing registry.
59    ///
60    /// Uses println! to have information available in stdout before tracing is initialized.
61    pub fn init(&self, sentry_config: &Option<SentryConfig>) -> anyhow::Result<()> {
62        match self.create_subscriber(sentry_config).try_init() {
63            Ok(()) => Ok(()),
64            Err(e) => {
65                println!("failed to create tracing registry | reason={e:?}");
66                Err(e.into())
67            }
68        }
69    }
70    pub fn create_subscriber(&self, sentry_config: &Option<SentryConfig>) -> impl SubscriberInitExt {
71        println!("creating tracing registry");
72
73        // configure tracing context layer
74        println!("tracing registry: enabling tracing context recorder");
75        let tracing_context_layer = TracingContextLayer.with_filter(EnvFilter::from_default_env());
76
77        // configure stdout log layer
78        let enable_ansi = stdout().is_terminal();
79        println!(
80            "tracing registry: enabling console logs | format={} ansi={}",
81            self.tracing_log_format, enable_ansi
82        );
83        let stdout_layer = match self.tracing_log_format {
84            TracingLogFormat::Json => fmt::Layer::default()
85                .event_format(TracingJsonFormatter)
86                .with_filter(EnvFilter::from_default_env())
87                .boxed(),
88            TracingLogFormat::Minimal => fmt::Layer::default()
89                .with_thread_ids(false)
90                .with_thread_names(false)
91                .with_target(false)
92                .with_ansi(enable_ansi)
93                .with_timer(TracingMinimalTimer)
94                .with_filter(EnvFilter::from_default_env())
95                .boxed(),
96            TracingLogFormat::Normal => fmt::Layer::default().with_ansi(enable_ansi).with_filter(EnvFilter::from_default_env()).boxed(),
97            TracingLogFormat::Verbose => fmt::Layer::default()
98                .with_ansi(enable_ansi)
99                .with_target(true)
100                .with_thread_ids(true)
101                .with_thread_names(true)
102                .with_filter(EnvFilter::from_default_env())
103                .boxed(),
104        };
105
106        // configure opentelemetry layer
107        let opentelemetry_layer = match &self.tracing_url {
108            Some(url) => {
109                let tracer = opentelemetry_tracer(url, self.tracing_protocol, &self.tracing_headers);
110                let layer = tracing_opentelemetry::layer()
111                    .with_tracked_inactivity(false)
112                    .with_tracer(tracer)
113                    .with_filter(EnvFilter::from_default_env());
114                Some(layer)
115            }
116            None => {
117                println!("tracing registry: skipping opentelemetry exporter");
118                None
119            }
120        };
121
122        // configure sentry layer
123        let sentry_layer = match &sentry_config {
124            Some(sentry_config) => {
125                println!("tracing registry: enabling sentry exporter | url={}", sentry_config.sentry_url);
126                let layer = sentry_tracing::layer().with_filter(EnvFilter::from_default_env());
127                Some(layer)
128            }
129            None => {
130                println!("tracing registry: skipping sentry exporter");
131                None
132            }
133        };
134
135        tracing_subscriber::registry()
136            .with(tracing_context_layer)
137            .with(stdout_layer)
138            .with(opentelemetry_layer)
139            .with(sentry_layer)
140    }
141}
142
143fn opentelemetry_tracer(url: &str, protocol: TracingProtocol, headers: &[String]) -> SdkTracer {
144    println!(
145        "tracing registry: enabling opentelemetry exporter | url={} protocol={} headers={} service={}",
146        url,
147        protocol,
148        headers.len(),
149        build_info::service_name()
150    );
151
152    // configure headers
153    let headers = headers
154        .iter()
155        .map(|header| {
156            let mut parts = header.splitn(2, '=');
157            let key = parts.next().unwrap();
158            let value = parts.next().unwrap_or_default();
159            (key, value)
160        })
161        .collect_vec();
162
163    let resource = SdkResource::builder()
164        .with_attributes([KeyValue::new("service.name", build_info::service_name())])
165        .build();
166
167    // configure tracer
168    match protocol {
169        TracingProtocol::Grpc => {
170            let mut protocol_metadata = MetadataMap::new();
171            for (key, value) in headers {
172                protocol_metadata.insert(MetadataKey::from_str(key).unwrap(), value.parse().unwrap());
173            }
174
175            let exporter = opentelemetry_otlp::SpanExporter::builder()
176                .with_tonic()
177                .with_endpoint(url)
178                .with_metadata(protocol_metadata)
179                .build()
180                .unwrap();
181
182            let batch_config = BatchConfigBuilder::default().with_max_queue_size(u16::MAX as usize).build();
183            let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
184                .with_batch_config(batch_config)
185                .build();
186
187            SdkTracerProvider::builder()
188                .with_resource(resource.clone())
189                .with_span_processor(batch_processor)
190                .build()
191                .tracer(build_info::binary_name())
192        }
193        TracingProtocol::HttpBinary | TracingProtocol::HttpJson => {
194            let mut protocol_headers = HashMap::new();
195            for (key, value) in headers {
196                protocol_headers.insert(key.to_owned(), value.to_owned());
197            }
198
199            let exporter = opentelemetry_otlp::SpanExporter::builder()
200                .with_http()
201                .with_endpoint(url)
202                .with_headers(protocol_headers)
203                .build()
204                .unwrap();
205
206            let batch_config = BatchConfigBuilder::default().with_max_queue_size(u16::MAX as usize).build();
207            let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
208                .with_batch_config(batch_config)
209                .build();
210
211            SdkTracerProvider::builder()
212                .with_resource(resource.clone())
213                .with_span_processor(batch_processor)
214                .build()
215                .tracer(build_info::binary_name())
216        }
217    }
218}
219
220// -----------------------------------------------------------------------------
221// Protocol
222// -----------------------------------------------------------------------------
223
224#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)]
225pub enum TracingProtocol {
226    #[serde(rename = "grpc")]
227    #[strum(to_string = "grpc")]
228    Grpc,
229
230    #[serde(rename = "http-binary")]
231    #[strum(to_string = "http-binary")]
232    HttpBinary,
233
234    #[serde(rename = "http-json")]
235    #[strum(to_string = "http-json")]
236    HttpJson,
237}
238
239impl FromStr for TracingProtocol {
240    type Err = anyhow::Error;
241
242    fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
243        match s.to_lowercase().trim() {
244            "grpc" => Ok(Self::Grpc),
245            "http-binary" => Ok(Self::HttpBinary),
246            "http-json" => Ok(Self::HttpJson),
247            s => Err(anyhow!("unknown tracing protocol: {}", s)),
248        }
249    }
250}
251
252impl From<TracingProtocol> for Protocol {
253    fn from(value: TracingProtocol) -> Self {
254        match value {
255            TracingProtocol::Grpc => Self::Grpc,
256            TracingProtocol::HttpBinary => Self::HttpBinary,
257            TracingProtocol::HttpJson => Self::HttpJson,
258        }
259    }
260}
261
262// -----------------------------------------------------------------------------
263// LogFormat
264// -----------------------------------------------------------------------------
265
266/// Tracing event log format.
267#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)]
268pub enum TracingLogFormat {
269    /// Minimal format: Time (no date), level, and message.
270    #[serde(rename = "minimal")]
271    #[strum(to_string = "minimal")]
272    Minimal,
273
274    /// Normal format: Default `tracing` crate configuration.
275    #[serde(rename = "normal")]
276    #[strum(to_string = "normal")]
277    Normal,
278
279    /// Verbose format: Full datetime, level, thread, target, and message.
280    #[serde(rename = "verbose")]
281    #[strum(to_string = "verbose")]
282    Verbose,
283
284    /// JSON format: Verbose information formatted as JSON.
285    #[serde(rename = "json")]
286    #[strum(to_string = "json")]
287    Json,
288}
289
290impl FromStr for TracingLogFormat {
291    type Err = anyhow::Error;
292
293    fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
294        match s.to_lowercase().trim() {
295            "json" => Ok(Self::Json),
296            "minimal" => Ok(Self::Minimal),
297            "normal" => Ok(Self::Normal),
298            "verbose" | "full" => Ok(Self::Verbose),
299            s => Err(anyhow!("unknown log format: {}", s)),
300        }
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    #[test]
309    fn test_tracing_config_with_json_format() {
310        let config = TracingConfig {
311            tracing_url: None,
312            tracing_protocol: TracingProtocol::Grpc,
313            tracing_headers: vec![],
314            tracing_log_format: TracingLogFormat::Json,
315        };
316        config.create_subscriber(&None);
317    }
318
319    #[test]
320    fn test_tracing_config_with_minimal_format() {
321        let config = TracingConfig {
322            tracing_url: None,
323            tracing_protocol: TracingProtocol::Grpc,
324            tracing_headers: vec![],
325            tracing_log_format: TracingLogFormat::Minimal,
326        };
327        config.create_subscriber(&None);
328    }
329
330    #[test]
331    fn test_tracing_config_with_normal_format() {
332        let config = TracingConfig {
333            tracing_url: None,
334            tracing_protocol: TracingProtocol::Grpc,
335            tracing_headers: vec![],
336            tracing_log_format: TracingLogFormat::Normal,
337        };
338        config.create_subscriber(&None);
339    }
340
341    #[test]
342    fn test_tracing_config_with_verbose_format() {
343        let config = TracingConfig {
344            tracing_url: None,
345            tracing_protocol: TracingProtocol::Grpc,
346            tracing_headers: vec![],
347            tracing_log_format: TracingLogFormat::Verbose,
348        };
349        config.create_subscriber(&None);
350    }
351
352    #[tokio::test]
353    async fn test_tracing_config_with_opentelemetry() {
354        let config = TracingConfig {
355            tracing_url: Some("http://localhost:4317".to_string()),
356            tracing_protocol: TracingProtocol::Grpc,
357            tracing_headers: vec![],
358            tracing_log_format: TracingLogFormat::Normal,
359        };
360        config.create_subscriber(&None);
361    }
362
363    #[test]
364    fn test_tracing_config_with_sentry() {
365        let sentry_config = SentryConfig {
366            sentry_url: "http://localhost:1234".to_string(),
367        };
368        let config = TracingConfig {
369            tracing_url: None,
370            tracing_protocol: TracingProtocol::Grpc,
371            tracing_headers: vec![],
372            tracing_log_format: TracingLogFormat::Normal,
373        };
374        config.create_subscriber(&Some(sentry_config));
375    }
376
377    #[tokio::test]
378    async fn test_tracing_config_with_tokio_console() {
379        let config = TracingConfig {
380            tracing_url: None,
381            tracing_protocol: TracingProtocol::Grpc,
382            tracing_headers: vec![],
383            tracing_log_format: TracingLogFormat::Normal,
384        };
385        config.create_subscriber(&None);
386    }
387
388    #[test]
389    fn test_tracing_protocol_from_str() {
390        assert_eq!(TracingProtocol::from_str("grpc").unwrap(), TracingProtocol::Grpc);
391        assert_eq!(TracingProtocol::from_str("http-binary").unwrap(), TracingProtocol::HttpBinary);
392        assert_eq!(TracingProtocol::from_str("http-json").unwrap(), TracingProtocol::HttpJson);
393        assert!(TracingProtocol::from_str("invalid").is_err());
394    }
395
396    #[test]
397    fn test_tracing_protocol_display() {
398        assert_eq!(TracingProtocol::Grpc.to_string(), "grpc");
399        assert_eq!(TracingProtocol::HttpBinary.to_string(), "http-binary");
400        assert_eq!(TracingProtocol::HttpJson.to_string(), "http-json");
401    }
402
403    #[test]
404    fn test_tracing_protocol_into_protocol() {
405        assert_eq!(Protocol::from(TracingProtocol::Grpc), Protocol::Grpc);
406        assert_eq!(Protocol::from(TracingProtocol::HttpBinary), Protocol::HttpBinary);
407        assert_eq!(Protocol::from(TracingProtocol::HttpJson), Protocol::HttpJson);
408    }
409}