stratus/infra/tracing/
tracing_services.rs

1//! Tracing services.
2
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::thread::Thread;
7
8use chrono::DateTime;
9use chrono::Local;
10use chrono::Utc;
11use serde::Serialize;
12use serde::ser::SerializeStruct;
13use tracing::Span;
14use tracing::Subscriber;
15use tracing::span;
16use tracing::span::Attributes;
17use tracing_serde::AsSerde;
18use tracing_serde::SerializeLevel;
19use tracing_serde::fields::AsMap;
20use tracing_serde::fields::SerializeFieldMap;
21use tracing_subscriber::Layer;
22use tracing_subscriber::fmt;
23use tracing_subscriber::fmt::FormatEvent;
24use tracing_subscriber::fmt::format::DefaultFields;
25use tracing_subscriber::fmt::time::FormatTime;
26use tracing_subscriber::registry::LookupSpan;
27
28use crate::alias::JsonValue;
29use crate::ext::not;
30use crate::ext::to_json_string;
31use crate::ext::to_json_value;
32
33// -----------------------------------------------------------------------------
34// Tracing service: Span field recorder
35// -----------------------------------------------------------------------------
36pub struct TracingContextLayer;
37
38impl<S> Layer<S> for TracingContextLayer
39where
40    S: Subscriber + for<'a> LookupSpan<'a>,
41{
42    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
43        let Some(span) = ctx.span(id) else { return };
44        span.extensions_mut().insert(SpanFields::new(attrs.field_map()));
45    }
46
47    fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
48        let Some(span) = ctx.span(id) else { return };
49        let mut extensions = span.extensions_mut();
50        if let Some(map) = extensions.get_mut::<SpanFields>() {
51            map.record(values.field_map());
52        }
53    }
54}
55
56#[derive(derive_more::Deref, derive_more::DerefMut)]
57struct SpanFields(#[deref] JsonValue);
58
59impl SpanFields {
60    fn new(fields: SerializeFieldMap<'_, Attributes>) -> Self {
61        let fields = to_json_value(fields);
62        Self(fields)
63    }
64
65    fn record(&mut self, fields: SerializeFieldMap<'_, span::Record>) {
66        let mut new_fields = to_json_value(fields);
67        let Some(new_fields) = new_fields.as_object_mut() else { return };
68
69        let Some(current_fields) = self.as_object_mut() else { return };
70        for (new_field_key, new_field_value) in new_fields.into_iter() {
71            current_fields.insert(new_field_key.to_owned(), new_field_value.to_owned());
72        }
73    }
74}
75
76// -----------------------------------------------------------------------------
77// Tracing service: - Json Formatter
78// -----------------------------------------------------------------------------
79
80pub struct TracingJsonFormatter;
81
82impl<S> FormatEvent<S, DefaultFields> for TracingJsonFormatter
83where
84    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
85{
86    fn format_event(&self, ctx: &fmt::FmtContext<'_, S, DefaultFields>, mut writer: fmt::format::Writer<'_>, event: &tracing::Event<'_>) -> std::fmt::Result {
87        let meta = event.metadata();
88
89        let mut fields = to_json_value(event.field_map());
90        strip_empty_fields(&mut fields);
91
92        // parse spans
93        let context = ctx.lookup_current().map(|span| {
94            let mut root_span = None;
95            let mut merged_span_context = HashMap::new();
96
97            let mut span_iterator = span.scope().peekable();
98            while let Some(span) = span_iterator.next() {
99                // merge span data into a single context
100                if let Some(span_fields) = span.extensions().get::<SpanFields>().and_then(|fields| fields.as_object()) {
101                    for (field_key, field_value) in span_fields {
102                        if not(merged_span_context.contains_key(field_key)) {
103                            merged_span_context.insert(field_key.to_owned(), field_value.to_owned());
104                        }
105                    }
106                }
107
108                // track root span
109                if span_iterator.peek().is_none() {
110                    root_span = Some(span);
111                }
112            }
113
114            strip_empty_context(&mut merged_span_context);
115
116            // generate context field
117            TracingLogContextField {
118                root_span_id: root_span.as_ref().map(|s| s.id().into_u64()).unwrap_or(0),
119                root_span_name: root_span.as_ref().map(|s| s.name()).unwrap_or(""),
120                span_id: span.id().into_u64(),
121                span_name: span.name(),
122                context: merged_span_context,
123            }
124        });
125
126        // parse metadata and event
127        let log = TracingLog {
128            timestamp: Utc::now(),
129            level: meta.level().as_serde(),
130            target: meta.target(),
131            thread: std::thread::current(),
132            fields,
133            context,
134        };
135
136        writeln!(writer, "{}", to_json_string(&log))
137    }
138}
139
140#[derive(derive_new::new)]
141struct TracingLog<'a> {
142    timestamp: DateTime<Utc>,
143    level: SerializeLevel<'a>,
144    target: &'a str,
145    thread: Thread,
146    // fields: SerializeFieldMap<'a, Event<'a>>,
147    fields: JsonValue,
148    context: Option<TracingLogContextField<'a>>,
149}
150
151impl Serialize for TracingLog<'_> {
152    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
153    where
154        S: serde::Serializer,
155    {
156        let mut state = serializer.serialize_struct("TracingLog", 7)?;
157        state.serialize_field("timestamp", &self.timestamp)?;
158        state.serialize_field("level", &self.level)?;
159        state.serialize_field("target", self.target)?;
160        state.serialize_field("threadId", &format!("{:?}", self.thread.id()))?;
161        state.serialize_field("threadName", self.thread.name().unwrap_or_default())?;
162        state.serialize_field("fields", &self.fields)?;
163        match &self.context {
164            Some(context) => state.serialize_field("context", context)?,
165            None => state.skip_field("context")?,
166        }
167        state.end()
168    }
169}
170
171#[derive(serde::Serialize)]
172struct TracingLogContextField<'a> {
173    root_span_id: u64,
174    root_span_name: &'a str,
175    span_id: u64,
176    span_name: &'a str,
177    #[serde(flatten)]
178    context: HashMap<String, JsonValue>,
179}
180
181fn strip_empty_fields(value: &mut JsonValue) {
182    match value {
183        JsonValue::Array(items) => {
184            items.iter_mut().for_each(strip_empty_fields);
185            items.retain(|item| !is_empty_value(item));
186        }
187        JsonValue::Object(map) => {
188            map.values_mut().for_each(strip_empty_fields);
189            map.retain(|_, value| !is_empty_value(value));
190        }
191        _ => {}
192    }
193}
194
195fn strip_empty_context(context: &mut HashMap<String, JsonValue>) {
196    context.values_mut().for_each(strip_empty_fields);
197    context.retain(|_, value| !is_empty_value(value));
198}
199
200fn is_empty_value(value: &JsonValue) -> bool {
201    match value {
202        JsonValue::Null => true,
203        JsonValue::String(value) => value.is_empty(),
204        _ => false,
205    }
206}
207
208// -----------------------------------------------------------------------------
209// Tracing service: - Minimal Timer
210// -----------------------------------------------------------------------------
211pub struct TracingMinimalTimer;
212
213impl FormatTime for TracingMinimalTimer {
214    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
215        write!(w, "{}", Local::now().time().format("%H:%M:%S%.3f"))
216    }
217}
218
219// -----------------------------------------------------------------------------
220// Tracing extensions
221// -----------------------------------------------------------------------------
222
223/// Extensions for `tracing::Span`.
224pub trait SpanExt {
225    #[cfg(feature = "tracing")]
226    /// Applies the provided function to the current span.
227    fn with<F>(fill: F)
228    where
229        F: Fn(Span),
230    {
231        let span = Span::current();
232        fill(span);
233    }
234
235    #[cfg(not(feature = "tracing"))]
236    /// Do nothing because the `tracing` function is disabled.
237    fn with<F>(_: F)
238    where
239        F: Fn(Span),
240    {
241    }
242
243    /// Records a value using `ToString` implementation.
244    fn rec_str<T>(&self, field: &'static str, value: &T)
245    where
246        T: ToString;
247
248    /// Records a value using `ToString` implementation if the option value is present.
249    fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
250    where
251        T: ToString;
252}
253
254impl SpanExt for Span {
255    fn rec_str<T>(&self, field: &'static str, value: &T)
256    where
257        T: ToString,
258    {
259        self.record(field, value.to_string().as_str());
260    }
261
262    fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
263    where
264        T: ToString,
265    {
266        if let Some(value) = value {
267            self.record(field, value.to_string().as_str());
268        }
269    }
270}
271
272// -----------------------------------------------------------------------------
273// TracingExt
274// -----------------------------------------------------------------------------
275
276/// Extensions for values used as fields in `tracing` macros.
277pub trait TracingExt {
278    /// Returns the `Display` value of the inner value or an empty string.
279    fn or_empty(&self) -> String;
280}
281
282impl<T> TracingExt for Option<T>
283where
284    T: Display + Debug,
285{
286    fn or_empty(&self) -> String {
287        match self {
288            Some(value) => value.to_string(),
289            None => String::new(),
290        }
291    }
292}
293
294// -----------------------------------------------------------------------------
295// Tracing macros
296// -----------------------------------------------------------------------------
297
298/// Logs an error and also wrap the existing error with the provided message.
299#[macro_export]
300macro_rules! log_and_err {
301    // with reason: wrap the original error with provided message
302    (reason = $error:ident, payload = $payload:expr, $msg:expr) => {
303        {
304            use anyhow::Context;
305            tracing::error!(reason = ?$error, payload = ?$payload, message = %$msg);
306            Err($error).context($msg)
307        }
308    };
309    (reason = $error:ident, $msg:expr) => {
310        {
311            use anyhow::Context;
312            tracing::error!(reason = ?$error, message = %$msg);
313            Err($error).context($msg)
314        }
315    };
316    // without reason: generate a new error using provided message
317    (payload = $payload:expr, $msg:expr) => {
318        {
319            use anyhow::Context;
320            use anyhow::anyhow;
321            tracing::error!(payload = ?$payload, message = %$msg);
322            let message = format!("{} | payload={:?}", $msg, $payload);
323            Err(anyhow!(message))
324        }
325    };
326    ($msg:expr) => {
327        {
328            use anyhow::anyhow;
329            tracing::error!(message = %$msg);
330            Err(anyhow!($msg))
331        }
332    };
333}
334
335/// Dynamic event logging based on the provided level.
336///
337/// <https://github.com/tokio-rs/tracing/issues/2730#issuecomment-1943022805>
338#[macro_export]
339macro_rules! event_with {
340    ($lvl:ident, $($arg:tt)+) => {
341        match $lvl {
342            ::tracing::Level::ERROR => ::tracing::error!($($arg)+),
343            ::tracing::Level::WARN => ::tracing::warn!($($arg)+),
344            ::tracing::Level::INFO => ::tracing::info!($($arg)+),
345            ::tracing::Level::DEBUG => ::tracing::debug!($($arg)+),
346            ::tracing::Level::TRACE => ::tracing::trace!($($arg)+),
347        }
348    };
349}
350
351// -----------------------------------------------------------------------------
352// Tracing functions
353// -----------------------------------------------------------------------------
354
355/// Creates a new possible unique correlation ID.
356///
357/// Uniqueness is not strictly necessary because traces are not stored permanently.
358pub fn new_cid() -> String {
359    pub const ALPHABET: [char; 36] = [
360        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
361        'u', 'v', 'w', 'x', 'y', 'z',
362    ];
363    nanoid::nanoid!(8, &ALPHABET)
364}
365
366/// Emits an info message that a task was spawned to backgroud.
367#[track_caller]
368pub fn info_task_spawn(name: &str) {
369    tracing::info!(parent: None, %name, "spawning task");
370}
371
372/// Emits an warning that a task is exiting because it received a cancenllation signal.
373///
374/// Returns the formatted tracing message.
375#[track_caller]
376pub fn warn_task_cancellation(task: &str) -> String {
377    let message = format!("exiting {task} because it received a cancellation signal");
378    tracing::warn!(%message);
379    message
380}
381
382/// Emits an warning that a task is exiting because the tx side was closed.
383///
384/// Returns the formatted tracing message.
385#[track_caller]
386pub fn warn_task_tx_closed(task: &str) -> String {
387    let message = format!("exiting {task} because the tx channel on the receiver side was closed");
388    tracing::warn!(%message);
389    message
390}
391
392/// Emits an warning that a task is exiting because the rx side was closed.
393///
394/// Returns the formatted tracing message.
395#[track_caller]
396pub fn warn_task_rx_closed(task: &str) -> String {
397    let message = format!("exiting {task} because the rx channel on the sender side was closed");
398    tracing::warn!(%message);
399    message
400}