stratus/
globals.rs

1use std::fmt::Debug;
2use std::sync::LazyLock;
3use std::sync::atomic::AtomicBool;
4use std::sync::atomic::Ordering;
5
6use chrono::DateTime;
7use chrono::Utc;
8use parking_lot::Mutex;
9use sentry::ClientInitGuard;
10use serde::Deserialize;
11use serde::Serialize;
12use serde_json::json;
13use tokio::runtime::Runtime;
14use tokio::sync::Semaphore;
15use tokio::sync::watch::Sender;
16use tokio_util::sync::CancellationToken;
17
18use crate::alias::JsonValue;
19use crate::config;
20use crate::config::StratusConfig;
21use crate::config::WithCommonConfig;
22use crate::eth::rpc::RpcContext;
23use crate::ext::not;
24use crate::ext::spawn_signal_handler;
25use crate::infra::tracing::warn_task_cancellation;
26
27// -----------------------------------------------------------------------------
28// Global services
29// -----------------------------------------------------------------------------
30
31pub struct GlobalServices<T>
32where
33    T: clap::Parser + WithCommonConfig + Debug,
34{
35    pub config: T,
36    pub runtime: Runtime,
37    _sentry_guard: Option<ClientInitGuard>,
38}
39
40impl<T> GlobalServices<T>
41where
42    T: clap::Parser + WithCommonConfig + Debug,
43{
44    #[allow(clippy::expect_used)]
45    /// Executes global services initialization.
46    pub fn init() -> Self
47    where
48        T: clap::Parser + WithCommonConfig + Debug,
49    {
50        GlobalState::setup_start_time();
51
52        // env-var support
53        config::load_dotenv_file();
54        config::load_env_aliases();
55
56        // parse configuration
57        let config = T::parse();
58        let common = config.common();
59
60        // Set the unknown_client_enabled value
61        GlobalState::set_unknown_client_enabled(common.unknown_client_enabled);
62
63        // init tokio
64        let tokio = common.init_tokio_runtime().expect("failed to init tokio runtime");
65
66        // init tracing
67        tokio.block_on(async {
68            common.tracing.init(&common.sentry).expect("failed to init tracing");
69        });
70
71        // init observability services
72        common.metrics.init().expect("failed to init metrics");
73
74        // init sentry
75        let sentry_guard = common
76            .sentry
77            .as_ref()
78            .map(|sentry_config| sentry_config.init(common.env).expect("failed to init sentry"));
79
80        // init signal handler
81        tokio.block_on(spawn_signal_handler()).expect("failed to init signal handlers");
82
83        Self {
84            config,
85            runtime: tokio,
86            _sentry_guard: sentry_guard,
87        }
88    }
89}
90
91// -----------------------------------------------------------------------------
92// Node mode
93// -----------------------------------------------------------------------------
94
95#[derive(Clone, Copy, PartialEq, Eq, Debug, strum::Display)]
96pub enum NodeMode {
97    #[strum(to_string = "leader")]
98    Leader,
99
100    #[strum(to_string = "follower")]
101    Follower,
102
103    /// Fake leader feches a block, re-executes its txs and then mines it's own block.
104    #[strum(to_string = "fake-leader")]
105    FakeLeader,
106}
107
108// -----------------------------------------------------------------------------
109// Global state
110// -----------------------------------------------------------------------------
111
112pub static STRATUS_SHUTDOWN_SIGNAL: LazyLock<CancellationToken> = LazyLock::new(CancellationToken::new);
113
114/// Importer is running or being shut-down?
115static IMPORTER_SHUTDOWN: AtomicBool = AtomicBool::new(true);
116
117/// A guard that is taken when importer is running.
118pub static IMPORTER_ONLINE_TASKS_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(crate::eth::follower::importer::TASKS_COUNT));
119
120/// Transaction should be accepted?
121static TRANSACTIONS_ENABLED: AtomicBool = AtomicBool::new(true);
122
123/// Unknown clients can interact with the application?
124static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true);
125
126/// Current node mode.
127static NODE_MODE: Mutex<NodeMode> = Mutex::new(NodeMode::Follower);
128
129static START_TIME: LazyLock<DateTime<Utc>> = LazyLock::new(Utc::now);
130
131/// Is stratus healthy?
132static HEALTH: LazyLock<Sender<bool>> = LazyLock::new(|| tokio::sync::watch::Sender::new(false));
133
134/// Should stratus restart when unhealthy?
135static RESTART_ON_UNHEALTHY: AtomicBool = AtomicBool::new(true);
136
137#[derive(Serialize, Deserialize, Debug)]
138pub struct GlobalState;
139
140impl GlobalState {
141    // -------------------------------------------------------------------------
142    // Application Shutdown
143    // -------------------------------------------------------------------------
144
145    pub fn set_health(new_health: bool) {
146        HEALTH.send_if_modified(|health| {
147            if *health != new_health {
148                tracing::info!(?new_health, "health status updated");
149                *health = new_health;
150                true
151            } else {
152                false
153            }
154        });
155    }
156
157    pub fn is_healthy() -> bool {
158        *HEALTH.borrow()
159    }
160
161    pub fn get_health_receiver() -> tokio::sync::watch::Receiver<bool> {
162        HEALTH.subscribe()
163    }
164
165    pub fn restart_on_unhealthy() -> bool {
166        RESTART_ON_UNHEALTHY.load(Ordering::Relaxed)
167    }
168
169    pub fn set_restart_on_unhealthy(state: bool) {
170        RESTART_ON_UNHEALTHY.store(state, Ordering::Relaxed);
171    }
172
173    /// Shutdown the application.
174    ///
175    /// Returns the formatted reason for shutdown.
176    pub fn shutdown_from(caller: &str, reason: &str) -> String {
177        tracing::warn!(%caller, %reason, "application is shutting down");
178        STRATUS_SHUTDOWN_SIGNAL.cancel();
179        format!("{caller} {reason}")
180    }
181
182    /// Checks if the application is being shutdown.
183    pub fn is_shutdown() -> bool {
184        STRATUS_SHUTDOWN_SIGNAL.is_cancelled()
185    }
186
187    /// Checks if the application is being shutdown. Emits an warning with the task name in case it is.
188    pub fn is_shutdown_warn(task_name: &str) -> bool {
189        let shutdown = Self::is_shutdown();
190        if shutdown {
191            warn_task_cancellation(task_name);
192        }
193        shutdown
194    }
195
196    /// Waits until a shutdown is signalled.
197    pub async fn wait_shutdown() {
198        STRATUS_SHUTDOWN_SIGNAL.cancelled().await;
199    }
200
201    /// Waits until a shutdown is signalled. Emits an warning with the task name when it is.
202    pub async fn wait_shutdown_warn(task_name: &str) {
203        Self::wait_shutdown().await;
204        warn_task_cancellation(task_name);
205    }
206
207    // -------------------------------------------------------------------------
208    // Importer Shutdown
209    // -------------------------------------------------------------------------
210
211    /// Shutdown the importer.
212    ///
213    /// Returns the formatted reason for importer shutdown.
214    pub fn shutdown_importer_from(caller: &str, reason: &str) -> String {
215        tracing::warn!(%caller, %reason, "importer is shutting down");
216        Self::set_importer_shutdown(true);
217        format!("{caller} {reason}")
218    }
219
220    /// Checks if the importer is shut down or being shut down.
221    pub fn is_importer_shutdown() -> bool {
222        IMPORTER_SHUTDOWN.load(Ordering::Relaxed)
223    }
224
225    /// Waits till importer is done.
226    pub async fn wait_for_importer_to_finish() {
227        // 3 permits will be available when all 3 tasks are finished
228        let result = IMPORTER_ONLINE_TASKS_SEMAPHORE
229            .acquire_many(crate::eth::follower::importer::TASKS_COUNT as u32)
230            .await;
231
232        if let Err(e) = result {
233            tracing::error!(reason = ?e, "error waiting for importer to finish");
234        }
235    }
236
237    /// Checks if the importer is being shutdown. Emits a warning with the task name in case it is.
238    pub fn is_importer_shutdown_warn(task_name: &str) -> bool {
239        let shutdown = Self::is_importer_shutdown();
240        if shutdown {
241            warn_task_cancellation(task_name);
242        }
243        shutdown
244    }
245
246    pub fn set_importer_shutdown(shutdown: bool) {
247        IMPORTER_SHUTDOWN.store(shutdown, Ordering::Relaxed);
248    }
249
250    // -------------------------------------------------------------------------
251    // Transaction
252    // -------------------------------------------------------------------------
253
254    /// Sets whether transactions should be accepted.
255    pub fn set_transactions_enabled(enabled: bool) {
256        TRANSACTIONS_ENABLED.store(enabled, Ordering::Relaxed);
257    }
258
259    /// Checks if transactions are enabled.
260    pub fn is_transactions_enabled() -> bool {
261        TRANSACTIONS_ENABLED.load(Ordering::Relaxed)
262    }
263
264    // -------------------------------------------------------------------------
265    // Unknown Client
266    // -------------------------------------------------------------------------
267
268    /// Enables or disables the unknown client.
269    pub fn set_unknown_client_enabled(enabled: bool) {
270        UNKNOWN_CLIENT_ENABLED.store(enabled, Ordering::Relaxed);
271    }
272
273    /// Checks if the unknown client is enabled.
274    pub fn is_unknown_client_enabled() -> bool {
275        UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed)
276    }
277
278    // -------------------------------------------------------------------------
279    // Node Mode
280    // -------------------------------------------------------------------------
281
282    /// Initializes the node mode based on the StratusConfig.
283    pub fn initialize_node_mode(config: &StratusConfig) {
284        let StratusConfig {
285            follower, leader, fake_leader, ..
286        } = config;
287
288        let mode = match (follower, leader, fake_leader) {
289            (true, false, false) => NodeMode::Follower,
290            (false, true, false) => NodeMode::Leader,
291            (false, false, true) => NodeMode::FakeLeader,
292            _ => unreachable!("exactly one must be true, config should be checked by clap"),
293        };
294        Self::set_node_mode(mode);
295
296        let should_run_importer = mode != NodeMode::Leader;
297        Self::set_importer_shutdown(not(should_run_importer));
298    }
299
300    pub fn set_node_mode(mode: NodeMode) {
301        *NODE_MODE.lock() = mode;
302    }
303
304    pub fn get_node_mode() -> NodeMode {
305        *NODE_MODE.lock()
306    }
307
308    // -------------------------------------------------------------------------
309    // JSON State
310    // -------------------------------------------------------------------------
311
312    pub fn get_global_state_as_json(ctx: &RpcContext) -> JsonValue {
313        let start_time = Self::get_start_time();
314        let elapsed_time = {
315            let delta = start_time.signed_duration_since(Utc::now()).abs();
316            let seconds = delta.num_seconds() % 60;
317            let minutes = delta.num_minutes() % 60;
318            let hours = delta.num_hours() % 24;
319            let days = delta.num_days();
320            format!("{days} days and {hours:02}:{minutes:02}:{seconds:02} elapsed")
321        };
322
323        json!({
324            "is_leader": Self::get_node_mode() == NodeMode::Leader || Self::get_node_mode() == NodeMode::FakeLeader,
325            "is_shutdown": Self::is_shutdown(),
326            "is_importer_shutdown": Self::is_importer_shutdown(),
327            "is_interval_miner_running": ctx.server.miner.is_interval_miner_running(),
328            "transactions_enabled": Self::is_transactions_enabled(),
329            "miner_paused": ctx.server.miner.is_paused(),
330            "unknown_client_enabled": Self::is_unknown_client_enabled(),
331            "start_time": start_time.format("%d/%m/%Y %H:%M UTC").to_string(),
332            "elapsed_time": elapsed_time,
333        })
334    }
335
336    fn get_start_time() -> DateTime<Utc> {
337        *START_TIME
338    }
339
340    pub fn setup_start_time() {
341        LazyLock::force(&START_TIME);
342    }
343}