use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Display;
use std::thread::Thread;
use chrono::DateTime;
use chrono::Local;
use chrono::Utc;
use serde::ser::SerializeStruct;
use serde::Serialize;
use tracing::span;
use tracing::span::Attributes;
use tracing::Span;
use tracing::Subscriber;
use tracing_serde::fields::AsMap;
use tracing_serde::fields::SerializeFieldMap;
use tracing_serde::AsSerde;
use tracing_serde::SerializeLevel;
use tracing_subscriber::fmt;
use tracing_subscriber::fmt::format::DefaultFields;
use tracing_subscriber::fmt::time::FormatTime;
use tracing_subscriber::fmt::FormatEvent;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
use crate::alias::JsonValue;
use crate::ext::not;
use crate::ext::to_json_string;
use crate::ext::to_json_value;
pub struct TracingContextLayer;
impl<S> Layer<S> for TracingContextLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
let Some(span) = ctx.span(id) else { return };
span.extensions_mut().insert(SpanFields::new(attrs.field_map()));
}
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
let Some(span) = ctx.span(id) else { return };
let mut extensions = span.extensions_mut();
if let Some(map) = extensions.get_mut::<SpanFields>() {
map.record(values.field_map());
}
}
}
#[derive(derive_more::Deref, derive_more::DerefMut)]
struct SpanFields(#[deref] JsonValue);
impl SpanFields {
fn new(fields: SerializeFieldMap<'_, Attributes>) -> Self {
let fields = to_json_value(fields);
Self(fields)
}
fn record(&mut self, fields: SerializeFieldMap<'_, span::Record>) {
let mut new_fields = to_json_value(fields);
let Some(new_fields) = new_fields.as_object_mut() else { return };
let Some(current_fields) = self.as_object_mut() else { return };
for (new_field_key, new_field_value) in new_fields.into_iter() {
current_fields.insert(new_field_key.to_owned(), new_field_value.to_owned());
}
}
}
pub struct TracingJsonFormatter;
impl<S> FormatEvent<S, DefaultFields> for TracingJsonFormatter
where
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
{
fn format_event(&self, ctx: &fmt::FmtContext<'_, S, DefaultFields>, mut writer: fmt::format::Writer<'_>, event: &tracing::Event<'_>) -> std::fmt::Result {
let meta = event.metadata();
let context = ctx.lookup_current().map(|span| {
let mut root_span = None;
let mut merged_span_context = HashMap::new();
let mut span_iterator = span.scope().peekable();
while let Some(span) = span_iterator.next() {
if let Some(span_fields) = span.extensions().get::<SpanFields>().and_then(|fields| fields.as_object()) {
for (field_key, field_value) in span_fields {
if not(merged_span_context.contains_key(field_key)) {
merged_span_context.insert(field_key.to_owned(), field_value.to_owned());
}
}
}
if span_iterator.peek().is_none() {
root_span = Some(span);
}
}
TracingLogContextField {
root_span_id: root_span.as_ref().map(|s| s.id().into_u64()).unwrap_or(0),
root_span_name: root_span.as_ref().map(|s| s.name()).unwrap_or(""),
span_id: span.id().into_u64(),
span_name: span.name(),
context: merged_span_context,
}
});
let log = TracingLog {
timestamp: Utc::now(),
level: meta.level().as_serde(),
target: meta.target(),
thread: std::thread::current(),
fields: to_json_value(event.field_map()),
context,
};
writeln!(writer, "{}", to_json_string(&log))
}
}
#[derive(derive_new::new)]
struct TracingLog<'a> {
timestamp: DateTime<Utc>,
level: SerializeLevel<'a>,
target: &'a str,
thread: Thread,
fields: JsonValue,
context: Option<TracingLogContextField<'a>>,
}
impl Serialize for TracingLog<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("TracingLog", 7)?;
state.serialize_field("timestamp", &self.timestamp)?;
state.serialize_field("level", &self.level)?;
state.serialize_field("target", self.target)?;
state.serialize_field("threadId", &format!("{:?}", self.thread.id()))?;
state.serialize_field("threadName", self.thread.name().unwrap_or_default())?;
state.serialize_field("fields", &self.fields)?;
match &self.context {
Some(context) => state.serialize_field("context", context)?,
None => state.skip_field("context")?,
}
state.end()
}
}
#[derive(serde::Serialize)]
struct TracingLogContextField<'a> {
root_span_id: u64,
root_span_name: &'a str,
span_id: u64,
span_name: &'a str,
#[serde(flatten)]
context: HashMap<String, JsonValue>,
}
pub struct TracingMinimalTimer;
impl FormatTime for TracingMinimalTimer {
fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
write!(w, "{}", Local::now().time().format("%H:%M:%S%.3f"))
}
}
pub trait SpanExt {
#[cfg(feature = "tracing")]
fn with<F>(fill: F)
where
F: Fn(Span),
{
let span = Span::current();
fill(span);
}
#[cfg(not(feature = "tracing"))]
fn with<F>(_: F)
where
F: Fn(Span),
{
}
fn rec_str<T>(&self, field: &'static str, value: &T)
where
T: ToString;
fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
where
T: ToString;
}
impl SpanExt for Span {
fn rec_str<T>(&self, field: &'static str, value: &T)
where
T: ToString,
{
self.record(field, value.to_string().as_str());
}
fn rec_opt<T>(&self, field: &'static str, value: &Option<T>)
where
T: ToString,
{
if let Some(ref value) = value {
self.record(field, value.to_string().as_str());
}
}
}
pub trait TracingExt {
fn or_empty(&self) -> String;
}
impl<T> TracingExt for Option<T>
where
T: Display + Debug,
{
fn or_empty(&self) -> String {
match self {
Some(value) => value.to_string(),
None => String::new(),
}
}
}
#[macro_export]
macro_rules! log_and_err {
(reason = $error:ident, payload = $payload:expr, $msg:expr) => {
{
use anyhow::Context;
tracing::error!(reason = ?$error, payload = ?$payload, message = %$msg);
Err($error).context($msg)
}
};
(reason = $error:ident, $msg:expr) => {
{
use anyhow::Context;
tracing::error!(reason = ?$error, message = %$msg);
Err($error).context($msg)
}
};
(payload = $payload:expr, $msg:expr) => {
{
use anyhow::Context;
use anyhow::anyhow;
tracing::error!(payload = ?$payload, message = %$msg);
let message = format!("{} | payload={:?}", $msg, $payload);
Err(anyhow!(message))
}
};
($msg:expr) => {
{
use anyhow::anyhow;
tracing::error!(message = %$msg);
Err(anyhow!($msg))
}
};
}
#[macro_export]
macro_rules! event_with {
($lvl:ident, $($arg:tt)+) => {
match $lvl {
::tracing::Level::ERROR => ::tracing::error!($($arg)+),
::tracing::Level::WARN => ::tracing::warn!($($arg)+),
::tracing::Level::INFO => ::tracing::info!($($arg)+),
::tracing::Level::DEBUG => ::tracing::debug!($($arg)+),
::tracing::Level::TRACE => ::tracing::trace!($($arg)+),
}
};
}
pub fn new_cid() -> String {
pub const ALPHABET: [char; 36] = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
'u', 'v', 'w', 'x', 'y', 'z',
];
nanoid::nanoid!(8, &ALPHABET)
}
#[track_caller]
pub fn info_task_spawn(name: &str) {
tracing::info!(parent: None, %name, "spawning task");
}
#[track_caller]
pub fn warn_task_cancellation(task: &str) -> String {
let message = format!("exiting {task} because it received a cancellation signal");
tracing::warn!(%message);
message
}
#[track_caller]
pub fn warn_task_tx_closed(task: &str) -> String {
let message = format!("exiting {task} because the tx channel on the receiver side was closed");
tracing::warn!(%message);
message
}
#[track_caller]
pub fn warn_task_rx_closed(task: &str) -> String {
let message = format!("exiting {task} because the rx channel on the sender side was closed");
tracing::warn!(%message);
message
}