stratus/infra/tracing/
tracing_services.rs

1//! Tracing services.
2
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::thread::Thread;
7
8use chrono::DateTime;
9use chrono::Local;
10use chrono::Utc;
11use serde::Serialize;
12use serde::ser::SerializeStruct;
13use tracing::Span;
14use tracing::Subscriber;
15use tracing::span;
16use tracing::span::Attributes;
17use tracing_serde::AsSerde;
18use tracing_serde::SerializeLevel;
19use tracing_serde::fields::AsMap;
20use tracing_serde::fields::SerializeFieldMap;
21use tracing_subscriber::Layer;
22use tracing_subscriber::fmt;
23use tracing_subscriber::fmt::FormatEvent;
24use tracing_subscriber::fmt::format::DefaultFields;
25use tracing_subscriber::fmt::time::FormatTime;
26use tracing_subscriber::registry::LookupSpan;
27
28use crate::alias::JsonValue;
29use crate::ext::not;
30use crate::ext::to_json_string;
31use crate::ext::to_json_value;
32
33// -----------------------------------------------------------------------------
34// Tracing service: Span field recorder
35// -----------------------------------------------------------------------------
36pub struct TracingContextLayer;
37
38impl<S> Layer<S> for TracingContextLayer
39where
40    S: Subscriber + for<'a> LookupSpan<'a>,
41{
42    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
43        let Some(span) = ctx.span(id) else { return };
44        span.extensions_mut().insert(SpanFields::new(attrs.field_map()));
45    }
46
47    fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
48        let Some(span) = ctx.span(id) else { return };
49        let mut extensions = span.extensions_mut();
50        if let Some(map) = extensions.get_mut::<SpanFields>() {
51            map.record(values.field_map());
52        }
53    }
54}
55
56#[derive(derive_more::Deref, derive_more::DerefMut)]
57struct SpanFields(#[deref] JsonValue);
58
59impl SpanFields {
60    fn new(fields: SerializeFieldMap<'_, Attributes>) -> Self {
61        let fields = to_json_value(fields);
62        Self(fields)
63    }
64
65    fn record(&mut self, fields: SerializeFieldMap<'_, span::Record>) {
66        let mut new_fields = to_json_value(fields);
67        let Some(new_fields) = new_fields.as_object_mut() else { return };
68
69        let Some(current_fields) = self.as_object_mut() else { return };
70        for (new_field_key, new_field_value) in new_fields.into_iter() {
71            current_fields.insert(new_field_key.to_owned(), new_field_value.to_owned());
72        }
73    }
74}
75
76// -----------------------------------------------------------------------------
77// Tracing service: - Json Formatter
78// -----------------------------------------------------------------------------
79
80pub struct TracingJsonFormatter;
81
82impl<S> FormatEvent<S, DefaultFields> for TracingJsonFormatter
83where
84    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
85{
86    fn format_event(&self, ctx: &fmt::FmtContext<'_, S, DefaultFields>, mut writer: fmt::format::Writer<'_>, event: &tracing::Event<'_>) -> std::fmt::Result {
87        let meta = event.metadata();
88
89        // parse spans
90        let context = ctx.lookup_current().map(|span| {
91            let mut root_span = None;
92            let mut merged_span_context = HashMap::new();
93
94            let mut span_iterator = span.scope().peekable();
95            while let Some(span) = span_iterator.next() {
96                // merge span data into a single context
97                if let Some(span_fields) = span.extensions().get::<SpanFields>().and_then(|fields| fields.as_object()) {
98                    for (field_key, field_value) in span_fields {
99                        if not(merged_span_context.contains_key(field_key)) {
100                            merged_span_context.insert(field_key.to_owned(), field_value.to_owned());
101                        }
102                    }
103                }
104
105                // track root span
106                if span_iterator.peek().is_none() {
107                    root_span = Some(span);
108                }
109            }
110
111            // generate context field
112            TracingLogContextField {
113                root_span_id: root_span.as_ref().map(|s| s.id().into_u64()).unwrap_or(0),
114                root_span_name: root_span.as_ref().map(|s| s.name()).unwrap_or(""),
115                span_id: span.id().into_u64(),
116                span_name: span.name(),
117                context: merged_span_context,
118            }
119        });
120
121        // parse metadata and event
122        let log = TracingLog {
123            timestamp: Utc::now(),
124            level: meta.level().as_serde(),
125            target: meta.target(),
126            thread: std::thread::current(),
127            fields: to_json_value(event.field_map()),
128            context,
129        };
130
131        writeln!(writer, "{}", to_json_string(&log))
132    }
133}
134
135#[derive(derive_new::new)]
136struct TracingLog<'a> {
137    timestamp: DateTime<Utc>,
138    level: SerializeLevel<'a>,
139    target: &'a str,
140    thread: Thread,
141    // fields: SerializeFieldMap<'a, Event<'a>>,
142    fields: JsonValue,
143    context: Option<TracingLogContextField<'a>>,
144}
145
146impl Serialize for TracingLog<'_> {
147    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
148    where
149        S: serde::Serializer,
150    {
151        let mut state = serializer.serialize_struct("TracingLog", 7)?;
152        state.serialize_field("timestamp", &self.timestamp)?;
153        state.serialize_field("level", &self.level)?;
154        state.serialize_field("target", self.target)?;
155        state.serialize_field("threadId", &format!("{:?}", self.thread.id()))?;
156        state.serialize_field("threadName", self.thread.name().unwrap_or_default())?;
157        state.serialize_field("fields", &self.fields)?;
158        match &self.context {
159            Some(context) => state.serialize_field("context", context)?,
160            None => state.skip_field("context")?,
161        }
162        state.end()
163    }
164}
165
166#[derive(serde::Serialize)]
167struct TracingLogContextField<'a> {
168    root_span_id: u64,
169    root_span_name: &'a str,
170    span_id: u64,
171    span_name: &'a str,
172    #[serde(flatten)]
173    context: HashMap<String, JsonValue>,
174}
175
176// -----------------------------------------------------------------------------
177// Tracing service: - Minimal Timer
178// -----------------------------------------------------------------------------
179pub struct TracingMinimalTimer;
180
181impl FormatTime for TracingMinimalTimer {
182    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
183        write!(w, "{}", Local::now().time().format("%H:%M:%S%.3f"))
184    }
185}
186
187// -----------------------------------------------------------------------------
188// Tracing extensions
189// -----------------------------------------------------------------------------
190
191/// Extensions for `tracing::Span`.
192pub trait SpanExt {
193    #[cfg(feature = "tracing")]
194    /// Applies the provided function to the current span.
195    fn with<F>(fill: F)
196    where
197        F: Fn(Span),
198    {
199        let span = Span::current();
200        fill(span);
201    }
202
203    #[cfg(not(feature = "tracing"))]
204    /// Do nothing because the `tracing` function is disabled.
205    fn with<F>(_: F)
206    where
207        F: Fn(Span),
208    {
209    }
210
211    /// Records a value using `ToString` implementation.
212    fn rec_str<T>(&self, field: &'static str, value: &T)
213    where
214        T: ToString;
215
216    /// Records a value using `ToString` implementation if the option value is present.
217    fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
218    where
219        T: ToString;
220}
221
222impl SpanExt for Span {
223    fn rec_str<T>(&self, field: &'static str, value: &T)
224    where
225        T: ToString,
226    {
227        self.record(field, value.to_string().as_str());
228    }
229
230    fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
231    where
232        T: ToString,
233    {
234        if let Some(value) = value {
235            self.record(field, value.to_string().as_str());
236        }
237    }
238}
239
240// -----------------------------------------------------------------------------
241// TracingExt
242// -----------------------------------------------------------------------------
243
244/// Extensions for values used as fields in `tracing` macros.
245pub trait TracingExt {
246    /// Returns the `Display` value of the inner value or an empty string.
247    fn or_empty(&self) -> String;
248}
249
250impl<T> TracingExt for Option<T>
251where
252    T: Display + Debug,
253{
254    fn or_empty(&self) -> String {
255        match self {
256            Some(value) => value.to_string(),
257            None => String::new(),
258        }
259    }
260}
261
262// -----------------------------------------------------------------------------
263// Tracing macros
264// -----------------------------------------------------------------------------
265
266/// Logs an error and also wrap the existing error with the provided message.
267#[macro_export]
268macro_rules! log_and_err {
269    // with reason: wrap the original error with provided message
270    (reason = $error:ident, payload = $payload:expr, $msg:expr) => {
271        {
272            use anyhow::Context;
273            tracing::error!(reason = ?$error, payload = ?$payload, message = %$msg);
274            Err($error).context($msg)
275        }
276    };
277    (reason = $error:ident, $msg:expr) => {
278        {
279            use anyhow::Context;
280            tracing::error!(reason = ?$error, message = %$msg);
281            Err($error).context($msg)
282        }
283    };
284    // without reason: generate a new error using provided message
285    (payload = $payload:expr, $msg:expr) => {
286        {
287            use anyhow::Context;
288            use anyhow::anyhow;
289            tracing::error!(payload = ?$payload, message = %$msg);
290            let message = format!("{} | payload={:?}", $msg, $payload);
291            Err(anyhow!(message))
292        }
293    };
294    ($msg:expr) => {
295        {
296            use anyhow::anyhow;
297            tracing::error!(message = %$msg);
298            Err(anyhow!($msg))
299        }
300    };
301}
302
303/// Dynamic event logging based on the provided level.
304///
305/// <https://github.com/tokio-rs/tracing/issues/2730#issuecomment-1943022805>
306#[macro_export]
307macro_rules! event_with {
308    ($lvl:ident, $($arg:tt)+) => {
309        match $lvl {
310            ::tracing::Level::ERROR => ::tracing::error!($($arg)+),
311            ::tracing::Level::WARN => ::tracing::warn!($($arg)+),
312            ::tracing::Level::INFO => ::tracing::info!($($arg)+),
313            ::tracing::Level::DEBUG => ::tracing::debug!($($arg)+),
314            ::tracing::Level::TRACE => ::tracing::trace!($($arg)+),
315        }
316    };
317}
318
319// -----------------------------------------------------------------------------
320// Tracing functions
321// -----------------------------------------------------------------------------
322
323/// Creates a new possible unique correlation ID.
324///
325/// Uniqueness is not strictly necessary because traces are not stored permanently.
326pub fn new_cid() -> String {
327    pub const ALPHABET: [char; 36] = [
328        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
329        'u', 'v', 'w', 'x', 'y', 'z',
330    ];
331    nanoid::nanoid!(8, &ALPHABET)
332}
333
334/// Emits an info message that a task was spawned to backgroud.
335#[track_caller]
336pub fn info_task_spawn(name: &str) {
337    tracing::info!(parent: None, %name, "spawning task");
338}
339
340/// Emits an warning that a task is exiting because it received a cancenllation signal.
341///
342/// Returns the formatted tracing message.
343#[track_caller]
344pub fn warn_task_cancellation(task: &str) -> String {
345    let message = format!("exiting {task} because it received a cancellation signal");
346    tracing::warn!(%message);
347    message
348}
349
350/// Emits an warning that a task is exiting because the tx side was closed.
351///
352/// Returns the formatted tracing message.
353#[track_caller]
354pub fn warn_task_tx_closed(task: &str) -> String {
355    let message = format!("exiting {task} because the tx channel on the receiver side was closed");
356    tracing::warn!(%message);
357    message
358}
359
360/// Emits an warning that a task is exiting because the rx side was closed.
361///
362/// Returns the formatted tracing message.
363#[track_caller]
364pub fn warn_task_rx_closed(task: &str) -> String {
365    let message = format!("exiting {task} because the rx channel on the sender side was closed");
366    tracing::warn!(%message);
367    message
368}