use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::LazyLock;
use chrono::DateTime;
use chrono::Utc;
use parking_lot::Mutex;
use sentry::ClientInitGuard;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use tokio::runtime::Runtime;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::alias::JsonValue;
use crate::config;
use crate::config::StratusConfig;
use crate::config::WithCommonConfig;
use crate::eth::follower::importer::Importer;
use crate::eth::rpc::RpcContext;
use crate::ext::not;
use crate::ext::spawn_signal_handler;
use crate::infra::tracing::warn_task_cancellation;
pub struct GlobalServices<T>
where
T: clap::Parser + WithCommonConfig + Debug,
{
pub config: T,
pub runtime: Runtime,
_sentry_guard: Option<ClientInitGuard>,
}
impl<T> GlobalServices<T>
where
T: clap::Parser + WithCommonConfig + Debug,
{
#[allow(clippy::expect_used)]
pub fn init() -> Self
where
T: clap::Parser + WithCommonConfig + Debug,
{
GlobalState::setup_start_time();
config::load_dotenv_file();
config::load_env_aliases();
let config = T::parse();
let common = config.common();
GlobalState::set_unknown_client_enabled(common.unknown_client_enabled);
let tokio = common.init_tokio_runtime().expect("failed to init tokio runtime");
tokio.block_on(async {
common.tracing.init(&common.sentry).expect("failed to init tracing");
});
common.metrics.init().expect("failed to init metrics");
let sentry_guard = common
.sentry
.as_ref()
.map(|sentry_config| sentry_config.init(common.env).expect("failed to init sentry"));
tokio.block_on(spawn_signal_handler()).expect("failed to init signal handlers");
Self {
config,
runtime: tokio,
_sentry_guard: sentry_guard,
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, strum::Display)]
pub enum NodeMode {
#[strum(to_string = "leader")]
Leader,
#[strum(to_string = "follower")]
Follower,
#[strum(to_string = "fake-leader")]
FakeLeader,
}
pub static STRATUS_SHUTDOWN_SIGNAL: LazyLock<CancellationToken> = LazyLock::new(CancellationToken::new);
static IMPORTER_SHUTDOWN: AtomicBool = AtomicBool::new(true);
pub static IMPORTER_ONLINE_TASKS_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(Importer::TASKS_COUNT));
static TRANSACTIONS_ENABLED: AtomicBool = AtomicBool::new(true);
static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true);
static NODE_MODE: Mutex<NodeMode> = Mutex::new(NodeMode::Follower);
static START_TIME: LazyLock<DateTime<Utc>> = LazyLock::new(Utc::now);
#[derive(Serialize, Deserialize, Debug)]
pub struct GlobalState;
impl GlobalState {
pub fn shutdown_from(caller: &str, reason: &str) -> String {
tracing::warn!(%caller, %reason, "application is shutting down");
STRATUS_SHUTDOWN_SIGNAL.cancel();
format!("{caller} {reason}")
}
pub fn is_shutdown() -> bool {
STRATUS_SHUTDOWN_SIGNAL.is_cancelled()
}
pub fn is_shutdown_warn(task_name: &str) -> bool {
let shutdown = Self::is_shutdown();
if shutdown {
warn_task_cancellation(task_name);
}
shutdown
}
pub async fn wait_shutdown() {
STRATUS_SHUTDOWN_SIGNAL.cancelled().await;
}
pub async fn wait_shutdown_warn(task_name: &str) {
Self::wait_shutdown().await;
warn_task_cancellation(task_name);
}
pub fn shutdown_importer_from(caller: &str, reason: &str) -> String {
tracing::warn!(%caller, %reason, "importer is shutting down");
Self::set_importer_shutdown(true);
format!("{caller} {reason}")
}
pub fn is_importer_shutdown() -> bool {
IMPORTER_SHUTDOWN.load(Ordering::Relaxed)
}
pub async fn wait_for_importer_to_finish() {
let result = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire_many(Importer::TASKS_COUNT as u32).await;
if let Err(e) = result {
tracing::error!(reason = ?e, "error waiting for importer to finish");
}
}
pub fn is_importer_shutdown_warn(task_name: &str) -> bool {
let shutdown = Self::is_importer_shutdown();
if shutdown {
warn_task_cancellation(task_name);
}
shutdown
}
pub fn set_importer_shutdown(shutdown: bool) {
IMPORTER_SHUTDOWN.store(shutdown, Ordering::Relaxed);
}
pub fn set_transactions_enabled(enabled: bool) {
TRANSACTIONS_ENABLED.store(enabled, Ordering::Relaxed);
}
pub fn is_transactions_enabled() -> bool {
TRANSACTIONS_ENABLED.load(Ordering::Relaxed)
}
pub fn set_unknown_client_enabled(enabled: bool) {
UNKNOWN_CLIENT_ENABLED.store(enabled, Ordering::Relaxed);
}
pub fn is_unknown_client_enabled() -> bool {
UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed)
}
pub fn initialize_node_mode(config: &StratusConfig) {
let StratusConfig {
follower, leader, fake_leader, ..
} = config;
let mode = match (follower, leader, fake_leader) {
(true, false, false) => NodeMode::Follower,
(false, true, false) => NodeMode::Leader,
(false, false, true) => NodeMode::FakeLeader,
_ => unreachable!("exactly one must be true, config should be checked by clap"),
};
Self::set_node_mode(mode);
let should_run_importer = mode != NodeMode::Leader;
Self::set_importer_shutdown(not(should_run_importer));
}
pub fn set_node_mode(mode: NodeMode) {
*NODE_MODE.lock() = mode;
}
pub fn get_node_mode() -> NodeMode {
*NODE_MODE.lock()
}
pub fn get_global_state_as_json(ctx: &RpcContext) -> JsonValue {
let start_time = Self::get_start_time();
let elapsed_time = {
let delta = start_time.signed_duration_since(Utc::now()).abs();
let seconds = delta.num_seconds() % 60;
let minutes = delta.num_minutes() % 60;
let hours = delta.num_hours() % 24;
let days = delta.num_days();
format!("{days} days and {hours:02}:{minutes:02}:{seconds:02} elapsed")
};
json!({
"is_leader": Self::get_node_mode() == NodeMode::Leader || Self::get_node_mode() == NodeMode::FakeLeader,
"is_shutdown": Self::is_shutdown(),
"is_importer_shutdown": Self::is_importer_shutdown(),
"is_interval_miner_running": ctx.miner.is_interval_miner_running(),
"transactions_enabled": Self::is_transactions_enabled(),
"miner_paused": ctx.miner.is_paused(),
"unknown_client_enabled": Self::is_unknown_client_enabled(),
"start_time": start_time.format("%d/%m/%Y %H:%M UTC").to_string(),
"elapsed_time": elapsed_time,
})
}
fn get_start_time() -> DateTime<Utc> {
*START_TIME
}
pub fn setup_start_time() {
LazyLock::force(&START_TIME);
}
}