1use std::fmt::Debug;
2use std::sync::LazyLock;
3use std::sync::atomic::AtomicBool;
4use std::sync::atomic::Ordering;
5
6use chrono::DateTime;
7use chrono::Utc;
8use parking_lot::Mutex;
9use sentry::ClientInitGuard;
10use serde::Deserialize;
11use serde::Serialize;
12use serde_json::json;
13use tokio::runtime::Runtime;
14use tokio::sync::Semaphore;
15use tokio::sync::watch::Sender;
16use tokio_util::sync::CancellationToken;
17
18use crate::alias::JsonValue;
19use crate::config;
20use crate::config::StratusConfig;
21use crate::config::WithCommonConfig;
22use crate::eth::rpc::RpcContext;
23use crate::ext::not;
24use crate::ext::spawn_signal_handler;
25use crate::infra::tracing::warn_task_cancellation;
26
27pub struct GlobalServices<T>
32where
33 T: clap::Parser + WithCommonConfig + Debug,
34{
35 pub config: T,
36 pub runtime: Runtime,
37 _sentry_guard: Option<ClientInitGuard>,
38}
39
40impl<T> GlobalServices<T>
41where
42 T: clap::Parser + WithCommonConfig + Debug,
43{
44 #[allow(clippy::expect_used)]
45 pub fn init() -> Self
47 where
48 T: clap::Parser + WithCommonConfig + Debug,
49 {
50 GlobalState::setup_start_time();
51
52 config::load_dotenv_file();
54 config::load_env_aliases();
55
56 let config = T::parse();
58 let common = config.common();
59
60 GlobalState::set_unknown_client_enabled(common.unknown_client_enabled);
62
63 let tokio = common.init_tokio_runtime().expect("failed to init tokio runtime");
65
66 tokio.block_on(async {
68 common.tracing.init(&common.sentry).expect("failed to init tracing");
69 });
70
71 common.metrics.init().expect("failed to init metrics");
73
74 let sentry_guard = common
76 .sentry
77 .as_ref()
78 .map(|sentry_config| sentry_config.init(common.env).expect("failed to init sentry"));
79
80 tokio.block_on(spawn_signal_handler()).expect("failed to init signal handlers");
82
83 Self {
84 config,
85 runtime: tokio,
86 _sentry_guard: sentry_guard,
87 }
88 }
89}
90
91#[derive(Clone, Copy, PartialEq, Eq, Debug, strum::Display)]
96pub enum NodeMode {
97 #[strum(to_string = "leader")]
98 Leader,
99
100 #[strum(to_string = "follower")]
101 Follower,
102
103 #[strum(to_string = "fake-leader")]
105 FakeLeader,
106}
107
108pub static STRATUS_SHUTDOWN_SIGNAL: LazyLock<CancellationToken> = LazyLock::new(CancellationToken::new);
113
114static IMPORTER_SHUTDOWN: AtomicBool = AtomicBool::new(true);
116
117pub static IMPORTER_ONLINE_TASKS_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(crate::eth::follower::importer::TASKS_COUNT));
119
120static TRANSACTIONS_ENABLED: AtomicBool = AtomicBool::new(true);
122
123static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true);
125
126static NODE_MODE: Mutex<NodeMode> = Mutex::new(NodeMode::Follower);
128
129static START_TIME: LazyLock<DateTime<Utc>> = LazyLock::new(Utc::now);
130
131static HEALTH: LazyLock<Sender<bool>> = LazyLock::new(|| tokio::sync::watch::Sender::new(false));
133
134static RESTART_ON_UNHEALTHY: AtomicBool = AtomicBool::new(true);
136
137#[derive(Serialize, Deserialize, Debug)]
138pub struct GlobalState;
139
140impl GlobalState {
141 pub fn set_health(new_health: bool) {
146 HEALTH.send_if_modified(|health| {
147 if *health != new_health {
148 tracing::info!(?new_health, "health status updated");
149 *health = new_health;
150 true
151 } else {
152 false
153 }
154 });
155 }
156
157 pub fn is_healthy() -> bool {
158 *HEALTH.borrow()
159 }
160
161 pub fn get_health_receiver() -> tokio::sync::watch::Receiver<bool> {
162 HEALTH.subscribe()
163 }
164
165 pub fn restart_on_unhealthy() -> bool {
166 RESTART_ON_UNHEALTHY.load(Ordering::Relaxed)
167 }
168
169 pub fn set_restart_on_unhealthy(state: bool) {
170 RESTART_ON_UNHEALTHY.store(state, Ordering::Relaxed);
171 }
172
173 pub fn shutdown_from(caller: &str, reason: &str) -> String {
177 tracing::warn!(%caller, %reason, "application is shutting down");
178 STRATUS_SHUTDOWN_SIGNAL.cancel();
179 format!("{caller} {reason}")
180 }
181
182 pub fn is_shutdown() -> bool {
184 STRATUS_SHUTDOWN_SIGNAL.is_cancelled()
185 }
186
187 pub fn is_shutdown_warn(task_name: &str) -> bool {
189 let shutdown = Self::is_shutdown();
190 if shutdown {
191 warn_task_cancellation(task_name);
192 }
193 shutdown
194 }
195
196 pub async fn wait_shutdown() {
198 STRATUS_SHUTDOWN_SIGNAL.cancelled().await;
199 }
200
201 pub async fn wait_shutdown_warn(task_name: &str) {
203 Self::wait_shutdown().await;
204 warn_task_cancellation(task_name);
205 }
206
207 pub fn shutdown_importer_from(caller: &str, reason: &str) -> String {
215 tracing::warn!(%caller, %reason, "importer is shutting down");
216 Self::set_importer_shutdown(true);
217 format!("{caller} {reason}")
218 }
219
220 pub fn is_importer_shutdown() -> bool {
222 IMPORTER_SHUTDOWN.load(Ordering::Relaxed)
223 }
224
225 pub async fn wait_for_importer_to_finish() {
227 let result = IMPORTER_ONLINE_TASKS_SEMAPHORE
229 .acquire_many(crate::eth::follower::importer::TASKS_COUNT as u32)
230 .await;
231
232 if let Err(e) = result {
233 tracing::error!(reason = ?e, "error waiting for importer to finish");
234 }
235 }
236
237 pub fn is_importer_shutdown_warn(task_name: &str) -> bool {
239 let shutdown = Self::is_importer_shutdown();
240 if shutdown {
241 warn_task_cancellation(task_name);
242 }
243 shutdown
244 }
245
246 pub fn set_importer_shutdown(shutdown: bool) {
247 IMPORTER_SHUTDOWN.store(shutdown, Ordering::Relaxed);
248 }
249
250 pub fn set_transactions_enabled(enabled: bool) {
256 TRANSACTIONS_ENABLED.store(enabled, Ordering::Relaxed);
257 }
258
259 pub fn is_transactions_enabled() -> bool {
261 TRANSACTIONS_ENABLED.load(Ordering::Relaxed)
262 }
263
264 pub fn set_unknown_client_enabled(enabled: bool) {
270 UNKNOWN_CLIENT_ENABLED.store(enabled, Ordering::Relaxed);
271 }
272
273 pub fn is_unknown_client_enabled() -> bool {
275 UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed)
276 }
277
278 pub fn initialize_node_mode(config: &StratusConfig) {
284 let StratusConfig {
285 follower, leader, fake_leader, ..
286 } = config;
287
288 let mode = match (follower, leader, fake_leader) {
289 (true, false, false) => NodeMode::Follower,
290 (false, true, false) => NodeMode::Leader,
291 (false, false, true) => NodeMode::FakeLeader,
292 _ => unreachable!("exactly one must be true, config should be checked by clap"),
293 };
294 Self::set_node_mode(mode);
295
296 let should_run_importer = mode != NodeMode::Leader;
297 Self::set_importer_shutdown(not(should_run_importer));
298 }
299
300 pub fn set_node_mode(mode: NodeMode) {
301 *NODE_MODE.lock() = mode;
302 }
303
304 pub fn get_node_mode() -> NodeMode {
305 *NODE_MODE.lock()
306 }
307
308 pub fn get_global_state_as_json(ctx: &RpcContext) -> JsonValue {
313 let start_time = Self::get_start_time();
314 let elapsed_time = {
315 let delta = start_time.signed_duration_since(Utc::now()).abs();
316 let seconds = delta.num_seconds() % 60;
317 let minutes = delta.num_minutes() % 60;
318 let hours = delta.num_hours() % 24;
319 let days = delta.num_days();
320 format!("{days} days and {hours:02}:{minutes:02}:{seconds:02} elapsed")
321 };
322
323 json!({
324 "is_leader": Self::get_node_mode() == NodeMode::Leader || Self::get_node_mode() == NodeMode::FakeLeader,
325 "is_shutdown": Self::is_shutdown(),
326 "is_importer_shutdown": Self::is_importer_shutdown(),
327 "is_interval_miner_running": ctx.server.miner.is_interval_miner_running(),
328 "transactions_enabled": Self::is_transactions_enabled(),
329 "miner_paused": ctx.server.miner.is_paused(),
330 "unknown_client_enabled": Self::is_unknown_client_enabled(),
331 "start_time": start_time.format("%d/%m/%Y %H:%M UTC").to_string(),
332 "elapsed_time": elapsed_time,
333 })
334 }
335
336 fn get_start_time() -> DateTime<Utc> {
337 *START_TIME
338 }
339
340 pub fn setup_start_time() {
341 LazyLock::force(&START_TIME);
342 }
343}