1use std::collections::HashMap;
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::thread::Thread;
7
8use chrono::DateTime;
9use chrono::Local;
10use chrono::Utc;
11use serde::Serialize;
12use serde::ser::SerializeStruct;
13use tracing::Span;
14use tracing::Subscriber;
15use tracing::span;
16use tracing::span::Attributes;
17use tracing_serde::AsSerde;
18use tracing_serde::SerializeLevel;
19use tracing_serde::fields::AsMap;
20use tracing_serde::fields::SerializeFieldMap;
21use tracing_subscriber::Layer;
22use tracing_subscriber::fmt;
23use tracing_subscriber::fmt::FormatEvent;
24use tracing_subscriber::fmt::format::DefaultFields;
25use tracing_subscriber::fmt::time::FormatTime;
26use tracing_subscriber::registry::LookupSpan;
27
28use crate::alias::JsonValue;
29use crate::ext::not;
30use crate::ext::to_json_string;
31use crate::ext::to_json_value;
32
33pub struct TracingContextLayer;
37
38impl<S> Layer<S> for TracingContextLayer
39where
40 S: Subscriber + for<'a> LookupSpan<'a>,
41{
42 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
43 let Some(span) = ctx.span(id) else { return };
44 span.extensions_mut().insert(SpanFields::new(attrs.field_map()));
45 }
46
47 fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
48 let Some(span) = ctx.span(id) else { return };
49 let mut extensions = span.extensions_mut();
50 if let Some(map) = extensions.get_mut::<SpanFields>() {
51 map.record(values.field_map());
52 }
53 }
54}
55
56#[derive(derive_more::Deref, derive_more::DerefMut)]
57struct SpanFields(#[deref] JsonValue);
58
59impl SpanFields {
60 fn new(fields: SerializeFieldMap<'_, Attributes>) -> Self {
61 let fields = to_json_value(fields);
62 Self(fields)
63 }
64
65 fn record(&mut self, fields: SerializeFieldMap<'_, span::Record>) {
66 let mut new_fields = to_json_value(fields);
67 let Some(new_fields) = new_fields.as_object_mut() else { return };
68
69 let Some(current_fields) = self.as_object_mut() else { return };
70 for (new_field_key, new_field_value) in new_fields.into_iter() {
71 current_fields.insert(new_field_key.to_owned(), new_field_value.to_owned());
72 }
73 }
74}
75
76pub struct TracingJsonFormatter;
81
82impl<S> FormatEvent<S, DefaultFields> for TracingJsonFormatter
83where
84 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
85{
86 fn format_event(&self, ctx: &fmt::FmtContext<'_, S, DefaultFields>, mut writer: fmt::format::Writer<'_>, event: &tracing::Event<'_>) -> std::fmt::Result {
87 let meta = event.metadata();
88
89 let context = ctx.lookup_current().map(|span| {
91 let mut root_span = None;
92 let mut merged_span_context = HashMap::new();
93
94 let mut span_iterator = span.scope().peekable();
95 while let Some(span) = span_iterator.next() {
96 if let Some(span_fields) = span.extensions().get::<SpanFields>().and_then(|fields| fields.as_object()) {
98 for (field_key, field_value) in span_fields {
99 if not(merged_span_context.contains_key(field_key)) {
100 merged_span_context.insert(field_key.to_owned(), field_value.to_owned());
101 }
102 }
103 }
104
105 if span_iterator.peek().is_none() {
107 root_span = Some(span);
108 }
109 }
110
111 TracingLogContextField {
113 root_span_id: root_span.as_ref().map(|s| s.id().into_u64()).unwrap_or(0),
114 root_span_name: root_span.as_ref().map(|s| s.name()).unwrap_or(""),
115 span_id: span.id().into_u64(),
116 span_name: span.name(),
117 context: merged_span_context,
118 }
119 });
120
121 let log = TracingLog {
123 timestamp: Utc::now(),
124 level: meta.level().as_serde(),
125 target: meta.target(),
126 thread: std::thread::current(),
127 fields: to_json_value(event.field_map()),
128 context,
129 };
130
131 writeln!(writer, "{}", to_json_string(&log))
132 }
133}
134
135#[derive(derive_new::new)]
136struct TracingLog<'a> {
137 timestamp: DateTime<Utc>,
138 level: SerializeLevel<'a>,
139 target: &'a str,
140 thread: Thread,
141 fields: JsonValue,
143 context: Option<TracingLogContextField<'a>>,
144}
145
146impl Serialize for TracingLog<'_> {
147 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
148 where
149 S: serde::Serializer,
150 {
151 let mut state = serializer.serialize_struct("TracingLog", 7)?;
152 state.serialize_field("timestamp", &self.timestamp)?;
153 state.serialize_field("level", &self.level)?;
154 state.serialize_field("target", self.target)?;
155 state.serialize_field("threadId", &format!("{:?}", self.thread.id()))?;
156 state.serialize_field("threadName", self.thread.name().unwrap_or_default())?;
157 state.serialize_field("fields", &self.fields)?;
158 match &self.context {
159 Some(context) => state.serialize_field("context", context)?,
160 None => state.skip_field("context")?,
161 }
162 state.end()
163 }
164}
165
166#[derive(serde::Serialize)]
167struct TracingLogContextField<'a> {
168 root_span_id: u64,
169 root_span_name: &'a str,
170 span_id: u64,
171 span_name: &'a str,
172 #[serde(flatten)]
173 context: HashMap<String, JsonValue>,
174}
175
176pub struct TracingMinimalTimer;
180
181impl FormatTime for TracingMinimalTimer {
182 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
183 write!(w, "{}", Local::now().time().format("%H:%M:%S%.3f"))
184 }
185}
186
187pub trait SpanExt {
193 #[cfg(feature = "tracing")]
194 fn with<F>(fill: F)
196 where
197 F: Fn(Span),
198 {
199 let span = Span::current();
200 fill(span);
201 }
202
203 #[cfg(not(feature = "tracing"))]
204 fn with<F>(_: F)
206 where
207 F: Fn(Span),
208 {
209 }
210
211 fn rec_str<T>(&self, field: &'static str, value: &T)
213 where
214 T: ToString;
215
216 fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
218 where
219 T: ToString;
220}
221
222impl SpanExt for Span {
223 fn rec_str<T>(&self, field: &'static str, value: &T)
224 where
225 T: ToString,
226 {
227 self.record(field, value.to_string().as_str());
228 }
229
230 fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
231 where
232 T: ToString,
233 {
234 if let Some(value) = value {
235 self.record(field, value.to_string().as_str());
236 }
237 }
238}
239
240pub trait TracingExt {
246 fn or_empty(&self) -> String;
248}
249
250impl<T> TracingExt for Option<T>
251where
252 T: Display + Debug,
253{
254 fn or_empty(&self) -> String {
255 match self {
256 Some(value) => value.to_string(),
257 None => String::new(),
258 }
259 }
260}
261
262#[macro_export]
268macro_rules! log_and_err {
269 (reason = $error:ident, payload = $payload:expr, $msg:expr) => {
271 {
272 use anyhow::Context;
273 tracing::error!(reason = ?$error, payload = ?$payload, message = %$msg);
274 Err($error).context($msg)
275 }
276 };
277 (reason = $error:ident, $msg:expr) => {
278 {
279 use anyhow::Context;
280 tracing::error!(reason = ?$error, message = %$msg);
281 Err($error).context($msg)
282 }
283 };
284 (payload = $payload:expr, $msg:expr) => {
286 {
287 use anyhow::Context;
288 use anyhow::anyhow;
289 tracing::error!(payload = ?$payload, message = %$msg);
290 let message = format!("{} | payload={:?}", $msg, $payload);
291 Err(anyhow!(message))
292 }
293 };
294 ($msg:expr) => {
295 {
296 use anyhow::anyhow;
297 tracing::error!(message = %$msg);
298 Err(anyhow!($msg))
299 }
300 };
301}
302
303#[macro_export]
307macro_rules! event_with {
308 ($lvl:ident, $($arg:tt)+) => {
309 match $lvl {
310 ::tracing::Level::ERROR => ::tracing::error!($($arg)+),
311 ::tracing::Level::WARN => ::tracing::warn!($($arg)+),
312 ::tracing::Level::INFO => ::tracing::info!($($arg)+),
313 ::tracing::Level::DEBUG => ::tracing::debug!($($arg)+),
314 ::tracing::Level::TRACE => ::tracing::trace!($($arg)+),
315 }
316 };
317}
318
319pub fn new_cid() -> String {
327 pub const ALPHABET: [char; 36] = [
328 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
329 'u', 'v', 'w', 'x', 'y', 'z',
330 ];
331 nanoid::nanoid!(8, &ALPHABET)
332}
333
334#[track_caller]
336pub fn info_task_spawn(name: &str) {
337 tracing::info!(parent: None, %name, "spawning task");
338}
339
340#[track_caller]
344pub fn warn_task_cancellation(task: &str) -> String {
345 let message = format!("exiting {task} because it received a cancellation signal");
346 tracing::warn!(%message);
347 message
348}
349
350#[track_caller]
354pub fn warn_task_tx_closed(task: &str) -> String {
355 let message = format!("exiting {task} because the tx channel on the receiver side was closed");
356 tracing::warn!(%message);
357 message
358}
359
360#[track_caller]
364pub fn warn_task_rx_closed(task: &str) -> String {
365 let message = format!("exiting {task} because the rx channel on the sender side was closed");
366 tracing::warn!(%message);
367 message
368}