1use std::collections::HashMap;
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::thread::Thread;
7
8use chrono::DateTime;
9use chrono::Local;
10use chrono::Utc;
11use serde::Serialize;
12use serde::ser::SerializeStruct;
13use tracing::Span;
14use tracing::Subscriber;
15use tracing::span;
16use tracing::span::Attributes;
17use tracing_serde::AsSerde;
18use tracing_serde::SerializeLevel;
19use tracing_serde::fields::AsMap;
20use tracing_serde::fields::SerializeFieldMap;
21use tracing_subscriber::Layer;
22use tracing_subscriber::fmt;
23use tracing_subscriber::fmt::FormatEvent;
24use tracing_subscriber::fmt::format::DefaultFields;
25use tracing_subscriber::fmt::time::FormatTime;
26use tracing_subscriber::registry::LookupSpan;
27
28use crate::alias::JsonValue;
29use crate::ext::not;
30use crate::ext::to_json_string;
31use crate::ext::to_json_value;
32
33pub struct TracingContextLayer;
37
38impl<S> Layer<S> for TracingContextLayer
39where
40 S: Subscriber + for<'a> LookupSpan<'a>,
41{
42 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
43 let Some(span) = ctx.span(id) else { return };
44 span.extensions_mut().insert(SpanFields::new(attrs.field_map()));
45 }
46
47 fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
48 let Some(span) = ctx.span(id) else { return };
49 let mut extensions = span.extensions_mut();
50 if let Some(map) = extensions.get_mut::<SpanFields>() {
51 map.record(values.field_map());
52 }
53 }
54}
55
56#[derive(derive_more::Deref, derive_more::DerefMut)]
57struct SpanFields(#[deref] JsonValue);
58
59impl SpanFields {
60 fn new(fields: SerializeFieldMap<'_, Attributes>) -> Self {
61 let fields = to_json_value(fields);
62 Self(fields)
63 }
64
65 fn record(&mut self, fields: SerializeFieldMap<'_, span::Record>) {
66 let mut new_fields = to_json_value(fields);
67 let Some(new_fields) = new_fields.as_object_mut() else { return };
68
69 let Some(current_fields) = self.as_object_mut() else { return };
70 for (new_field_key, new_field_value) in new_fields.into_iter() {
71 current_fields.insert(new_field_key.to_owned(), new_field_value.to_owned());
72 }
73 }
74}
75
76pub struct TracingJsonFormatter;
81
82impl<S> FormatEvent<S, DefaultFields> for TracingJsonFormatter
83where
84 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
85{
86 fn format_event(&self, ctx: &fmt::FmtContext<'_, S, DefaultFields>, mut writer: fmt::format::Writer<'_>, event: &tracing::Event<'_>) -> std::fmt::Result {
87 let meta = event.metadata();
88
89 let mut fields = to_json_value(event.field_map());
90 strip_empty_fields(&mut fields);
91
92 let context = ctx.lookup_current().map(|span| {
94 let mut root_span = None;
95 let mut merged_span_context = HashMap::new();
96
97 let mut span_iterator = span.scope().peekable();
98 while let Some(span) = span_iterator.next() {
99 if let Some(span_fields) = span.extensions().get::<SpanFields>().and_then(|fields| fields.as_object()) {
101 for (field_key, field_value) in span_fields {
102 if not(merged_span_context.contains_key(field_key)) {
103 merged_span_context.insert(field_key.to_owned(), field_value.to_owned());
104 }
105 }
106 }
107
108 if span_iterator.peek().is_none() {
110 root_span = Some(span);
111 }
112 }
113
114 strip_empty_context(&mut merged_span_context);
115
116 TracingLogContextField {
118 root_span_id: root_span.as_ref().map(|s| s.id().into_u64()).unwrap_or(0),
119 root_span_name: root_span.as_ref().map(|s| s.name()).unwrap_or(""),
120 span_id: span.id().into_u64(),
121 span_name: span.name(),
122 context: merged_span_context,
123 }
124 });
125
126 let log = TracingLog {
128 timestamp: Utc::now(),
129 level: meta.level().as_serde(),
130 target: meta.target(),
131 thread: std::thread::current(),
132 fields,
133 context,
134 };
135
136 writeln!(writer, "{}", to_json_string(&log))
137 }
138}
139
140#[derive(derive_new::new)]
141struct TracingLog<'a> {
142 timestamp: DateTime<Utc>,
143 level: SerializeLevel<'a>,
144 target: &'a str,
145 thread: Thread,
146 fields: JsonValue,
148 context: Option<TracingLogContextField<'a>>,
149}
150
151impl Serialize for TracingLog<'_> {
152 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
153 where
154 S: serde::Serializer,
155 {
156 let mut state = serializer.serialize_struct("TracingLog", 7)?;
157 state.serialize_field("timestamp", &self.timestamp)?;
158 state.serialize_field("level", &self.level)?;
159 state.serialize_field("target", self.target)?;
160 state.serialize_field("threadId", &format!("{:?}", self.thread.id()))?;
161 state.serialize_field("threadName", self.thread.name().unwrap_or_default())?;
162 state.serialize_field("fields", &self.fields)?;
163 match &self.context {
164 Some(context) => state.serialize_field("context", context)?,
165 None => state.skip_field("context")?,
166 }
167 state.end()
168 }
169}
170
171#[derive(serde::Serialize)]
172struct TracingLogContextField<'a> {
173 root_span_id: u64,
174 root_span_name: &'a str,
175 span_id: u64,
176 span_name: &'a str,
177 #[serde(flatten)]
178 context: HashMap<String, JsonValue>,
179}
180
181fn strip_empty_fields(value: &mut JsonValue) {
182 match value {
183 JsonValue::Array(items) => {
184 items.iter_mut().for_each(strip_empty_fields);
185 items.retain(|item| !is_empty_value(item));
186 }
187 JsonValue::Object(map) => {
188 map.values_mut().for_each(strip_empty_fields);
189 map.retain(|_, value| !is_empty_value(value));
190 }
191 _ => {}
192 }
193}
194
195fn strip_empty_context(context: &mut HashMap<String, JsonValue>) {
196 context.values_mut().for_each(strip_empty_fields);
197 context.retain(|_, value| !is_empty_value(value));
198}
199
200fn is_empty_value(value: &JsonValue) -> bool {
201 match value {
202 JsonValue::Null => true,
203 JsonValue::String(value) => value.is_empty(),
204 _ => false,
205 }
206}
207
208pub struct TracingMinimalTimer;
212
213impl FormatTime for TracingMinimalTimer {
214 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
215 write!(w, "{}", Local::now().time().format("%H:%M:%S%.3f"))
216 }
217}
218
219pub trait SpanExt {
225 #[cfg(feature = "tracing")]
226 fn with<F>(fill: F)
228 where
229 F: Fn(Span),
230 {
231 let span = Span::current();
232 fill(span);
233 }
234
235 #[cfg(not(feature = "tracing"))]
236 fn with<F>(_: F)
238 where
239 F: Fn(Span),
240 {
241 }
242
243 fn rec_str<T>(&self, field: &'static str, value: &T)
245 where
246 T: ToString;
247
248 fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
250 where
251 T: ToString;
252}
253
254impl SpanExt for Span {
255 fn rec_str<T>(&self, field: &'static str, value: &T)
256 where
257 T: ToString,
258 {
259 self.record(field, value.to_string().as_str());
260 }
261
262 fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
263 where
264 T: ToString,
265 {
266 if let Some(value) = value {
267 self.record(field, value.to_string().as_str());
268 }
269 }
270}
271
272pub trait TracingExt {
278 fn or_empty(&self) -> String;
280}
281
282impl<T> TracingExt for Option<T>
283where
284 T: Display + Debug,
285{
286 fn or_empty(&self) -> String {
287 match self {
288 Some(value) => value.to_string(),
289 None => String::new(),
290 }
291 }
292}
293
294#[macro_export]
300macro_rules! log_and_err {
301 (reason = $error:ident, payload = $payload:expr, $msg:expr) => {
303 {
304 use anyhow::Context;
305 tracing::error!(reason = ?$error, payload = ?$payload, message = %$msg);
306 Err($error).context($msg)
307 }
308 };
309 (reason = $error:ident, $msg:expr) => {
310 {
311 use anyhow::Context;
312 tracing::error!(reason = ?$error, message = %$msg);
313 Err($error).context($msg)
314 }
315 };
316 (payload = $payload:expr, $msg:expr) => {
318 {
319 use anyhow::Context;
320 use anyhow::anyhow;
321 tracing::error!(payload = ?$payload, message = %$msg);
322 let message = format!("{} | payload={:?}", $msg, $payload);
323 Err(anyhow!(message))
324 }
325 };
326 ($msg:expr) => {
327 {
328 use anyhow::anyhow;
329 tracing::error!(message = %$msg);
330 Err(anyhow!($msg))
331 }
332 };
333}
334
335#[macro_export]
339macro_rules! event_with {
340 ($lvl:ident, $($arg:tt)+) => {
341 match $lvl {
342 ::tracing::Level::ERROR => ::tracing::error!($($arg)+),
343 ::tracing::Level::WARN => ::tracing::warn!($($arg)+),
344 ::tracing::Level::INFO => ::tracing::info!($($arg)+),
345 ::tracing::Level::DEBUG => ::tracing::debug!($($arg)+),
346 ::tracing::Level::TRACE => ::tracing::trace!($($arg)+),
347 }
348 };
349}
350
351pub fn new_cid() -> String {
359 pub const ALPHABET: [char; 36] = [
360 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
361 'u', 'v', 'w', 'x', 'y', 'z',
362 ];
363 nanoid::nanoid!(8, &ALPHABET)
364}
365
366#[track_caller]
368pub fn info_task_spawn(name: &str) {
369 tracing::info!(parent: None, %name, "spawning task");
370}
371
372#[track_caller]
376pub fn warn_task_cancellation(task: &str) -> String {
377 let message = format!("exiting {task} because it received a cancellation signal");
378 tracing::warn!(%message);
379 message
380}
381
382#[track_caller]
386pub fn warn_task_tx_closed(task: &str) -> String {
387 let message = format!("exiting {task} because the tx channel on the receiver side was closed");
388 tracing::warn!(%message);
389 message
390}
391
392#[track_caller]
396pub fn warn_task_rx_closed(task: &str) -> String {
397 let message = format!("exiting {task} because the rx channel on the sender side was closed");
398 tracing::warn!(%message);
399 message
400}