1use std::collections::HashMap;
2use std::io::IsTerminal;
3use std::io::stdout;
4use std::str::FromStr;
5
6use anyhow::anyhow;
7use clap::Parser;
8use display_json::DebugAsJson;
9use http::HeaderMap;
10use http::header::HeaderName;
11use http::header::HeaderValue;
12use itertools::Itertools;
13use opentelemetry::KeyValue;
14use opentelemetry::trace::TracerProvider;
15use opentelemetry_otlp::Protocol;
16use opentelemetry_otlp::WithExportConfig;
17use opentelemetry_otlp::WithHttpConfig;
18use opentelemetry_otlp::WithTonicConfig;
19use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
20use opentelemetry_sdk::Resource as SdkResource;
21use opentelemetry_sdk::trace::BatchConfigBuilder;
22use opentelemetry_sdk::trace::SdkTracerProvider;
23use opentelemetry_sdk::trace::Tracer as SdkTracer;
24use tracing_subscriber::EnvFilter;
25use tracing_subscriber::Layer;
26use tracing_subscriber::filter::LevelFilter;
27use tracing_subscriber::fmt;
28use tracing_subscriber::layer::SubscriberExt;
29use tracing_subscriber::util::SubscriberInitExt;
30
31use crate::infra::build_info;
32use crate::infra::sentry::SentryConfig;
33use crate::infra::tracing::TracingContextLayer;
34use crate::infra::tracing::TracingJsonFormatter;
35use crate::infra::tracing::TracingMinimalTimer;
36
37#[derive(DebugAsJson, Clone, Parser, serde::Serialize)]
42pub struct TracingConfig {
43 #[arg(long = "tracing-url", alias = "tracing-collector-url", env = "TRACING_URL")]
45 pub tracing_url: Option<String>,
46
47 #[arg(long = "tracing-protocol", env = "TRACING_PROTOCOL", default_value = "grpc")]
49 pub tracing_protocol: TracingProtocol,
50
51 #[arg(long = "tracing-headers", env = "TRACING_HEADERS", value_delimiter = ',')]
53 pub tracing_headers: Vec<String>,
54
55 #[arg(long = "tracing-log-format", env = "TRACING_LOG_FORMAT", default_value = "normal")]
57 pub tracing_log_format: TracingLogFormat,
58}
59
60impl TracingConfig {
61 pub fn init(&self, sentry_config: &Option<SentryConfig>) -> anyhow::Result<()> {
65 match self.create_subscriber(sentry_config).try_init() {
66 Ok(()) => Ok(()),
67 Err(e) => {
68 println!("failed to create tracing registry | reason={e:?}");
69 Err(e.into())
70 }
71 }
72 }
73 pub fn create_subscriber(&self, sentry_config: &Option<SentryConfig>) -> impl SubscriberInitExt {
74 println!("creating tracing registry");
75
76 println!("tracing registry: enabling tracing context recorder");
78 let tracing_context_layer = TracingContextLayer.with_filter(EnvFilter::from_default_env());
79
80 let enable_ansi = stdout().is_terminal();
82 println!(
83 "tracing registry: enabling console logs | format={} ansi={}",
84 self.tracing_log_format, enable_ansi
85 );
86 let stdout_layer = match self.tracing_log_format {
87 TracingLogFormat::Json => fmt::Layer::default()
88 .event_format(TracingJsonFormatter)
89 .with_filter(EnvFilter::from_default_env())
90 .boxed(),
91 TracingLogFormat::Minimal => fmt::Layer::default()
92 .with_thread_ids(false)
93 .with_thread_names(false)
94 .with_target(false)
95 .with_ansi(enable_ansi)
96 .with_timer(TracingMinimalTimer)
97 .with_filter(EnvFilter::from_default_env())
98 .boxed(),
99 TracingLogFormat::Normal => fmt::Layer::default().with_ansi(enable_ansi).with_filter(EnvFilter::from_default_env()).boxed(),
100 TracingLogFormat::Verbose => fmt::Layer::default()
101 .with_ansi(enable_ansi)
102 .with_target(true)
103 .with_thread_ids(true)
104 .with_thread_names(true)
105 .with_filter(EnvFilter::from_default_env())
106 .boxed(),
107 };
108
109 let opentelemetry_layer = match &self.tracing_url {
111 Some(url) => {
112 let tracer = opentelemetry_tracer(url, self.tracing_protocol, &self.tracing_headers);
113 let layer = tracing_opentelemetry::layer()
114 .with_tracked_inactivity(false)
115 .with_tracer(tracer)
116 .with_filter(EnvFilter::from_default_env());
117 Some(layer)
118 }
119 None => {
120 println!("tracing registry: skipping opentelemetry exporter");
121 None
122 }
123 };
124
125 let sentry_layer = match &sentry_config {
126 Some(sentry_config) => {
127 println!("tracing registry: enabling sentry exporter | url={}", sentry_config.sentry_url);
128 let layer = sentry_tracing::layer()
129 .event_filter(crate::infra::sentry::sentry_event_filter)
130 .span_filter(crate::infra::sentry::sentry_span_filter)
131 .with_filter(LevelFilter::ERROR)
132 .with_filter(EnvFilter::from_default_env());
133 Some(layer)
134 }
135 None => {
136 println!("tracing registry: skipping sentry exporter");
137 None
138 }
139 };
140
141 tracing_subscriber::registry()
142 .with(tracing_context_layer)
143 .with(stdout_layer)
144 .with(opentelemetry_layer)
145 .with(sentry_layer)
146 }
147}
148
149fn opentelemetry_tracer(url: &str, protocol: TracingProtocol, headers: &[String]) -> SdkTracer {
150 println!(
151 "tracing registry: enabling opentelemetry exporter | url={} protocol={} headers={} service={}",
152 url,
153 protocol,
154 headers.len(),
155 build_info::service_name()
156 );
157
158 let headers = headers
160 .iter()
161 .map(|header| {
162 let mut parts = header.splitn(2, '=');
163 let key = parts.next().unwrap();
164 let value = parts.next().unwrap_or_default();
165 (key, value)
166 })
167 .collect_vec();
168
169 let resource = SdkResource::builder()
170 .with_attributes([KeyValue::new("service.name", build_info::service_name())])
171 .build();
172
173 match protocol {
175 TracingProtocol::Grpc => {
176 let mut http_metadata = HeaderMap::new();
177 for (key, value) in headers {
178 let header_name = HeaderName::from_str(key).unwrap();
179 let header_value = HeaderValue::from_str(value).unwrap();
180 http_metadata.insert(header_name, header_value);
181 }
182
183 let protocol_metadata = MetadataMap::from_headers(http_metadata);
184
185 let exporter = opentelemetry_otlp::SpanExporter::builder()
186 .with_tonic()
187 .with_endpoint(url)
188 .with_metadata(protocol_metadata)
189 .build()
190 .unwrap();
191
192 let batch_config = BatchConfigBuilder::default().with_max_queue_size(u16::MAX as usize).build();
193 let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
194 .with_batch_config(batch_config)
195 .build();
196
197 SdkTracerProvider::builder()
198 .with_resource(resource.clone())
199 .with_span_processor(batch_processor)
200 .build()
201 .tracer(build_info::binary_name())
202 }
203 TracingProtocol::HttpBinary | TracingProtocol::HttpJson => {
204 let mut protocol_headers = HashMap::new();
205 for (key, value) in headers {
206 protocol_headers.insert(key.to_owned(), value.to_owned());
207 }
208
209 let exporter = opentelemetry_otlp::SpanExporter::builder()
210 .with_http()
211 .with_endpoint(url)
212 .with_headers(protocol_headers)
213 .build()
214 .unwrap();
215
216 let batch_config = BatchConfigBuilder::default().with_max_queue_size(u16::MAX as usize).build();
217 let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
218 .with_batch_config(batch_config)
219 .build();
220
221 SdkTracerProvider::builder()
222 .with_resource(resource.clone())
223 .with_span_processor(batch_processor)
224 .build()
225 .tracer(build_info::binary_name())
226 }
227 }
228}
229
230#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)]
235pub enum TracingProtocol {
236 #[serde(rename = "grpc")]
237 #[strum(to_string = "grpc")]
238 Grpc,
239
240 #[serde(rename = "http-binary")]
241 #[strum(to_string = "http-binary")]
242 HttpBinary,
243
244 #[serde(rename = "http-json")]
245 #[strum(to_string = "http-json")]
246 HttpJson,
247}
248
249impl FromStr for TracingProtocol {
250 type Err = anyhow::Error;
251
252 fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
253 match s.to_lowercase().trim() {
254 "grpc" => Ok(Self::Grpc),
255 "http-binary" => Ok(Self::HttpBinary),
256 "http-json" => Ok(Self::HttpJson),
257 s => Err(anyhow!("unknown tracing protocol: {s}")),
258 }
259 }
260}
261
262impl From<TracingProtocol> for Protocol {
263 fn from(value: TracingProtocol) -> Self {
264 match value {
265 TracingProtocol::Grpc => Self::Grpc,
266 TracingProtocol::HttpBinary => Self::HttpBinary,
267 TracingProtocol::HttpJson => Self::HttpJson,
268 }
269 }
270}
271
272#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)]
278pub enum TracingLogFormat {
279 #[serde(rename = "minimal")]
281 #[strum(to_string = "minimal")]
282 Minimal,
283
284 #[serde(rename = "normal")]
286 #[strum(to_string = "normal")]
287 Normal,
288
289 #[serde(rename = "verbose")]
291 #[strum(to_string = "verbose")]
292 Verbose,
293
294 #[serde(rename = "json")]
296 #[strum(to_string = "json")]
297 Json,
298}
299
300impl FromStr for TracingLogFormat {
301 type Err = anyhow::Error;
302
303 fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
304 match s.to_lowercase().trim() {
305 "json" => Ok(Self::Json),
306 "minimal" => Ok(Self::Minimal),
307 "normal" => Ok(Self::Normal),
308 "verbose" | "full" => Ok(Self::Verbose),
309 s => Err(anyhow!("unknown log format: {s}")),
310 }
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn test_tracing_config_with_json_format() {
320 let config = TracingConfig {
321 tracing_url: None,
322 tracing_protocol: TracingProtocol::Grpc,
323 tracing_headers: vec![],
324 tracing_log_format: TracingLogFormat::Json,
325 };
326 config.create_subscriber(&None);
327 }
328
329 #[test]
330 fn test_tracing_config_with_minimal_format() {
331 let config = TracingConfig {
332 tracing_url: None,
333 tracing_protocol: TracingProtocol::Grpc,
334 tracing_headers: vec![],
335 tracing_log_format: TracingLogFormat::Minimal,
336 };
337 config.create_subscriber(&None);
338 }
339
340 #[test]
341 fn test_tracing_config_with_normal_format() {
342 let config = TracingConfig {
343 tracing_url: None,
344 tracing_protocol: TracingProtocol::Grpc,
345 tracing_headers: vec![],
346 tracing_log_format: TracingLogFormat::Normal,
347 };
348 config.create_subscriber(&None);
349 }
350
351 #[test]
352 fn test_tracing_config_with_verbose_format() {
353 let config = TracingConfig {
354 tracing_url: None,
355 tracing_protocol: TracingProtocol::Grpc,
356 tracing_headers: vec![],
357 tracing_log_format: TracingLogFormat::Verbose,
358 };
359 config.create_subscriber(&None);
360 }
361
362 #[tokio::test]
363 async fn test_tracing_config_with_opentelemetry() {
364 let config = TracingConfig {
365 tracing_url: Some("http://localhost:4317".to_string()),
366 tracing_protocol: TracingProtocol::Grpc,
367 tracing_headers: vec![],
368 tracing_log_format: TracingLogFormat::Normal,
369 };
370 config.create_subscriber(&None);
371 }
372
373 #[test]
374 fn test_tracing_config_with_sentry() {
375 let sentry_config = SentryConfig {
376 sentry_url: "http://localhost:1234".to_string(),
377 };
378 let config = TracingConfig {
379 tracing_url: None,
380 tracing_protocol: TracingProtocol::Grpc,
381 tracing_headers: vec![],
382 tracing_log_format: TracingLogFormat::Normal,
383 };
384 config.create_subscriber(&Some(sentry_config));
385 }
386
387 #[tokio::test]
388 async fn test_tracing_config_with_tokio_console() {
389 let config = TracingConfig {
390 tracing_url: None,
391 tracing_protocol: TracingProtocol::Grpc,
392 tracing_headers: vec![],
393 tracing_log_format: TracingLogFormat::Normal,
394 };
395 config.create_subscriber(&None);
396 }
397
398 #[test]
399 fn test_tracing_protocol_from_str() {
400 assert_eq!(TracingProtocol::from_str("grpc").unwrap(), TracingProtocol::Grpc);
401 assert_eq!(TracingProtocol::from_str("http-binary").unwrap(), TracingProtocol::HttpBinary);
402 assert_eq!(TracingProtocol::from_str("http-json").unwrap(), TracingProtocol::HttpJson);
403 assert!(TracingProtocol::from_str("invalid").is_err());
404 }
405
406 #[test]
407 fn test_tracing_protocol_display() {
408 assert_eq!(TracingProtocol::Grpc.to_string(), "grpc");
409 assert_eq!(TracingProtocol::HttpBinary.to_string(), "http-binary");
410 assert_eq!(TracingProtocol::HttpJson.to_string(), "http-json");
411 }
412
413 #[test]
414 fn test_tracing_protocol_into_protocol() {
415 assert_eq!(Protocol::from(TracingProtocol::Grpc), Protocol::Grpc);
416 assert_eq!(Protocol::from(TracingProtocol::HttpBinary), Protocol::HttpBinary);
417 assert_eq!(Protocol::from(TracingProtocol::HttpJson), Protocol::HttpJson);
418 }
419}