stratus/
globals.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::LazyLock;

use chrono::DateTime;
use chrono::Utc;
use parking_lot::Mutex;
use sentry::ClientInitGuard;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use tokio::runtime::Runtime;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;

use crate::alias::JsonValue;
use crate::config;
use crate::config::StratusConfig;
use crate::config::WithCommonConfig;
use crate::eth::follower::importer::Importer;
use crate::eth::rpc::RpcContext;
use crate::ext::not;
use crate::ext::spawn_signal_handler;
use crate::infra::tracing::warn_task_cancellation;

// -----------------------------------------------------------------------------
// Global services
// -----------------------------------------------------------------------------

pub struct GlobalServices<T>
where
    T: clap::Parser + WithCommonConfig + Debug,
{
    pub config: T,
    pub runtime: Runtime,
    _sentry_guard: Option<ClientInitGuard>,
}

impl<T> GlobalServices<T>
where
    T: clap::Parser + WithCommonConfig + Debug,
{
    #[allow(clippy::expect_used)]
    /// Executes global services initialization.
    pub fn init() -> Self
    where
        T: clap::Parser + WithCommonConfig + Debug,
    {
        GlobalState::setup_start_time();

        // env-var support
        config::load_dotenv_file();
        config::load_env_aliases();

        // parse configuration
        let config = T::parse();
        let common = config.common();

        // Set the unknown_client_enabled value
        GlobalState::set_unknown_client_enabled(common.unknown_client_enabled);

        // init tokio
        let tokio = common.init_tokio_runtime().expect("failed to init tokio runtime");

        // init tracing
        tokio.block_on(async {
            common.tracing.init(&common.sentry).expect("failed to init tracing");
        });

        // init observability services
        common.metrics.init().expect("failed to init metrics");

        // init sentry
        let sentry_guard = common
            .sentry
            .as_ref()
            .map(|sentry_config| sentry_config.init(common.env).expect("failed to init sentry"));

        // init signal handler
        tokio.block_on(spawn_signal_handler()).expect("failed to init signal handlers");

        Self {
            config,
            runtime: tokio,
            _sentry_guard: sentry_guard,
        }
    }
}

// -----------------------------------------------------------------------------
// Node mode
// -----------------------------------------------------------------------------

#[derive(Clone, Copy, PartialEq, Eq, Debug, strum::Display)]
pub enum NodeMode {
    #[strum(to_string = "leader")]
    Leader,

    #[strum(to_string = "follower")]
    Follower,

    /// Fake leader feches a block, re-executes its txs and then mines it's own block.
    #[strum(to_string = "fake-leader")]
    FakeLeader,
}

// -----------------------------------------------------------------------------
// Global state
// -----------------------------------------------------------------------------

pub static STRATUS_SHUTDOWN_SIGNAL: LazyLock<CancellationToken> = LazyLock::new(CancellationToken::new);

/// Importer is running or being shut-down?
static IMPORTER_SHUTDOWN: AtomicBool = AtomicBool::new(true);

/// A guard that is taken when importer is running.
pub static IMPORTER_ONLINE_TASKS_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(Importer::TASKS_COUNT));

/// Transaction should be accepted?
static TRANSACTIONS_ENABLED: AtomicBool = AtomicBool::new(true);

/// Unknown clients can interact with the application?
static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true);

/// Current node mode.
static NODE_MODE: Mutex<NodeMode> = Mutex::new(NodeMode::Follower);

static START_TIME: LazyLock<DateTime<Utc>> = LazyLock::new(Utc::now);

#[derive(Serialize, Deserialize, Debug)]
pub struct GlobalState;

impl GlobalState {
    // -------------------------------------------------------------------------
    // Application Shutdown
    // -------------------------------------------------------------------------

    /// Shutdown the application.
    ///
    /// Returns the formatted reason for shutdown.
    pub fn shutdown_from(caller: &str, reason: &str) -> String {
        tracing::warn!(%caller, %reason, "application is shutting down");
        STRATUS_SHUTDOWN_SIGNAL.cancel();
        format!("{caller} {reason}")
    }

    /// Checks if the application is being shutdown.
    pub fn is_shutdown() -> bool {
        STRATUS_SHUTDOWN_SIGNAL.is_cancelled()
    }

    /// Checks if the application is being shutdown. Emits an warning with the task name in case it is.
    pub fn is_shutdown_warn(task_name: &str) -> bool {
        let shutdown = Self::is_shutdown();
        if shutdown {
            warn_task_cancellation(task_name);
        }
        shutdown
    }

    /// Waits until a shutdown is signalled.
    pub async fn wait_shutdown() {
        STRATUS_SHUTDOWN_SIGNAL.cancelled().await;
    }

    /// Waits until a shutdown is signalled. Emits an warning with the task name when it is.
    pub async fn wait_shutdown_warn(task_name: &str) {
        Self::wait_shutdown().await;
        warn_task_cancellation(task_name);
    }

    // -------------------------------------------------------------------------
    // Importer Shutdown
    // -------------------------------------------------------------------------

    /// Shutdown the importer.
    ///
    /// Returns the formatted reason for importer shutdown.
    pub fn shutdown_importer_from(caller: &str, reason: &str) -> String {
        tracing::warn!(%caller, %reason, "importer is shutting down");
        Self::set_importer_shutdown(true);
        format!("{caller} {reason}")
    }

    /// Checks if the importer is shut down or being shut down.
    pub fn is_importer_shutdown() -> bool {
        IMPORTER_SHUTDOWN.load(Ordering::Relaxed)
    }

    /// Waits till importer is done.
    pub async fn wait_for_importer_to_finish() {
        // 3 permits will be available when all 3 tasks are finished
        let result = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire_many(Importer::TASKS_COUNT as u32).await;

        if let Err(e) = result {
            tracing::error!(reason = ?e, "error waiting for importer to finish");
        }
    }

    /// Checks if the importer is being shutdown. Emits a warning with the task name in case it is.
    pub fn is_importer_shutdown_warn(task_name: &str) -> bool {
        let shutdown = Self::is_importer_shutdown();
        if shutdown {
            warn_task_cancellation(task_name);
        }
        shutdown
    }

    pub fn set_importer_shutdown(shutdown: bool) {
        IMPORTER_SHUTDOWN.store(shutdown, Ordering::Relaxed);
    }

    // -------------------------------------------------------------------------
    // Transaction
    // -------------------------------------------------------------------------

    /// Sets whether transactions should be accepted.
    pub fn set_transactions_enabled(enabled: bool) {
        TRANSACTIONS_ENABLED.store(enabled, Ordering::Relaxed);
    }

    /// Checks if transactions are enabled.
    pub fn is_transactions_enabled() -> bool {
        TRANSACTIONS_ENABLED.load(Ordering::Relaxed)
    }

    // -------------------------------------------------------------------------
    // Unknown Client
    // -------------------------------------------------------------------------

    /// Enables or disables the unknown client.
    pub fn set_unknown_client_enabled(enabled: bool) {
        UNKNOWN_CLIENT_ENABLED.store(enabled, Ordering::Relaxed);
    }

    /// Checks if the unknown client is enabled.
    pub fn is_unknown_client_enabled() -> bool {
        UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed)
    }

    // -------------------------------------------------------------------------
    // Node Mode
    // -------------------------------------------------------------------------

    /// Initializes the node mode based on the StratusConfig.
    pub fn initialize_node_mode(config: &StratusConfig) {
        let StratusConfig {
            follower, leader, fake_leader, ..
        } = config;

        let mode = match (follower, leader, fake_leader) {
            (true, false, false) => NodeMode::Follower,
            (false, true, false) => NodeMode::Leader,
            (false, false, true) => NodeMode::FakeLeader,
            _ => unreachable!("exactly one must be true, config should be checked by clap"),
        };
        Self::set_node_mode(mode);

        let should_run_importer = mode != NodeMode::Leader;
        Self::set_importer_shutdown(not(should_run_importer));
    }

    pub fn set_node_mode(mode: NodeMode) {
        *NODE_MODE.lock() = mode;
    }

    pub fn get_node_mode() -> NodeMode {
        *NODE_MODE.lock()
    }

    // -------------------------------------------------------------------------
    // JSON State
    // -------------------------------------------------------------------------

    pub fn get_global_state_as_json(ctx: &RpcContext) -> JsonValue {
        let start_time = Self::get_start_time();
        let elapsed_time = {
            let delta = start_time.signed_duration_since(Utc::now()).abs();
            let seconds = delta.num_seconds() % 60;
            let minutes = delta.num_minutes() % 60;
            let hours = delta.num_hours() % 24;
            let days = delta.num_days();
            format!("{days} days and {hours:02}:{minutes:02}:{seconds:02} elapsed")
        };

        json!({
            "is_leader": Self::get_node_mode() == NodeMode::Leader || Self::get_node_mode() == NodeMode::FakeLeader,
            "is_shutdown": Self::is_shutdown(),
            "is_importer_shutdown": Self::is_importer_shutdown(),
            "is_interval_miner_running": ctx.miner.is_interval_miner_running(),
            "transactions_enabled": Self::is_transactions_enabled(),
            "miner_paused": ctx.miner.is_paused(),
            "unknown_client_enabled": Self::is_unknown_client_enabled(),
            "start_time": start_time.format("%d/%m/%Y %H:%M UTC").to_string(),
            "elapsed_time": elapsed_time,
        })
    }

    fn get_start_time() -> DateTime<Utc> {
        *START_TIME
    }

    pub fn setup_start_time() {
        LazyLock::force(&START_TIME);
    }
}