stratus/
config.rs

1//! Application configuration.
2
3use std::env;
4use std::str::FromStr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::atomic::Ordering;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use clap::ArgGroup;
11use clap::Parser;
12use display_json::DebugAsJson;
13use strum::VariantNames;
14use tokio::runtime::Builder;
15use tokio::runtime::Runtime;
16
17use crate::eth::executor::ExecutorConfig;
18use crate::eth::external_rpc::ExternalRpcConfig;
19use crate::eth::follower::importer::ImporterConfig;
20use crate::eth::miner::MinerConfig;
21use crate::eth::primitives::Address;
22use crate::eth::rpc::RpcServerConfig;
23use crate::eth::storage::StorageConfig;
24use crate::ext::parse_duration;
25use crate::infra::build_info;
26use crate::infra::kafka::KafkaConfig;
27use crate::infra::metrics::MetricsConfig;
28use crate::infra::sentry::SentryConfig;
29use crate::infra::tracing::TracingConfig;
30
31/// Loads .env files according to the binary and environment.
32pub fn load_dotenv_file() {
33    // parse env manually because this is executed before clap
34    let env = match std::env::var("ENV") {
35        Ok(env) => Environment::from_str(env.as_str()),
36        Err(_) => Ok(Environment::Local),
37    };
38
39    // determine the .env file to load
40    let env_filename = match env {
41        Ok(Environment::Local) => {
42            // local environment only
43            match std::env::var("LOCAL_ENV_PATH") {
44                Ok(local_path) => local_path,
45                Err(_) => format!("config/{}.env.local", build_info::binary_name()),
46            }
47        }
48        Ok(env) => format!("config/{}.env.{}", build_info::binary_name(), env),
49        Err(e) => {
50            println!("{e}");
51            return;
52        }
53    };
54
55    println!("reading env file | filename={env_filename}");
56
57    if let Err(e) = dotenvy::from_filename(env_filename) {
58        println!("env file error: {e}");
59    }
60}
61
62/// Applies env-var aliases because Clap does not support this feature.
63pub fn load_env_aliases() {
64    fn env_alias(canonical: &'static str, alias: &'static str) {
65        if let Ok(value) = env::var(alias) {
66            unsafe {
67                env::set_var(canonical, value);
68            }
69        }
70    }
71    env_alias("EXECUTOR_CHAIN_ID", "CHAIN_ID");
72    env_alias("EXECUTOR_EVMS", "EVMS");
73    env_alias("EXECUTOR_EVMS", "NUM_EVMS");
74    env_alias("EXECUTOR_REJECT_NOT_CONTRACT", "REJECT_NOT_CONTRACT");
75    env_alias("EXECUTOR_STRATEGY", "STRATEGY");
76    env_alias("TRACING_LOG_FORMAT", "LOG_FORMAT");
77    env_alias("TRACING_URL", "TRACING_COLLECTOR_URL");
78}
79
80// -----------------------------------------------------------------------------
81// Config: Common
82// -----------------------------------------------------------------------------
83
84pub trait WithCommonConfig {
85    fn common(&self) -> &CommonConfig;
86}
87
88/// Configuration that can be used by any binary.
89#[derive(DebugAsJson, Clone, Parser, serde::Serialize)]
90#[command(author, version, about, long_about = None)]
91pub struct CommonConfig {
92    /// Environment where the application is running.
93    #[arg(long = "env", env = "ENV", default_value = "local")]
94    pub env: Environment,
95
96    /// Number of threads to execute global async tasks.
97    #[arg(long = "async-threads", env = "ASYNC_THREADS", default_value = "32")]
98    pub num_async_threads: usize,
99
100    /// Number of threads to execute global blocking tasks.
101    #[arg(long = "blocking-threads", env = "BLOCKING_THREADS", default_value = "512")]
102    pub num_blocking_threads: usize,
103
104    #[clap(flatten)]
105    pub tracing: TracingConfig,
106
107    #[clap(flatten)]
108    pub sentry: Option<SentryConfig>,
109
110    #[clap(flatten)]
111    pub metrics: MetricsConfig,
112
113    /// Prevents clap from breaking when passing `nocapture` options in tests.
114    #[arg(long = "nocapture")]
115    pub nocapture: bool,
116
117    /// Enables or disables unknown client interactions.
118    #[arg(long = "unknown-client-enabled", env = "UNKNOWN_CLIENT_ENABLED", default_value = "true")]
119    pub unknown_client_enabled: bool,
120}
121
122impl WithCommonConfig for CommonConfig {
123    fn common(&self) -> &CommonConfig {
124        self
125    }
126}
127
128impl CommonConfig {
129    /// Initializes Tokio runtime.
130    pub fn init_tokio_runtime(&self) -> anyhow::Result<Runtime> {
131        println!(
132            "creating tokio runtime | async_threads={} blocking_threads={}",
133            self.num_async_threads, self.num_blocking_threads
134        );
135
136        let num_async_threads = self.num_async_threads;
137        let num_blocking_threads = self.num_blocking_threads;
138        let result = Builder::new_multi_thread()
139            .enable_all()
140            .worker_threads(num_async_threads)
141            .max_blocking_threads(num_blocking_threads)
142            .thread_keep_alive(Duration::from_secs(u64::MAX))
143            .thread_name_fn(move || {
144                // Tokio first create all async threads, then all blocking threads.
145                // Threads are not expected to die because Tokio catches panics and blocking threads are configured to never die.
146                // If one of these premises are not true anymore, this will possibly categorize threads wrongly.
147
148                static ASYNC_ID: AtomicUsize = AtomicUsize::new(1);
149                static BLOCKING_ID: AtomicUsize = AtomicUsize::new(1);
150
151                // identify async threads
152                let async_id = ASYNC_ID.fetch_add(1, Ordering::SeqCst);
153                if async_id <= num_async_threads {
154                    if cfg!(feature = "flamegraph") {
155                        return "tokio-async".to_string();
156                    } else {
157                        return format!("tokio-async-{async_id}");
158                    }
159                }
160
161                // identify blocking threads
162                let blocking_id = BLOCKING_ID.fetch_add(1, Ordering::SeqCst);
163                if cfg!(feature = "flamegraph") {
164                    "tokio-blocking".to_string()
165                } else {
166                    format!("tokio-blocking-{blocking_id}")
167                }
168            })
169            .build();
170
171        match result {
172            Ok(runtime) => Ok(runtime),
173            Err(e) => {
174                println!("failed to create tokio runtime | reason={e:?}");
175                Err(e.into())
176            }
177        }
178    }
179}
180
181// -----------------------------------------------------------------------------
182// Config: Stratus
183// -----------------------------------------------------------------------------
184
185/// Configuration for main Stratus service.
186#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)]
187#[clap(group = ArgGroup::new("mode").required(true).args(&["leader", "follower", "fake_leader"]))]
188pub struct StratusConfig {
189    #[arg(long = "leader", env = "LEADER", conflicts_with_all = ["follower", "fake_leader", "ImporterConfig"])]
190    pub leader: bool,
191
192    #[arg(long = "follower", env = "FOLLOWER", conflicts_with_all = ["leader", "fake_leader"], requires = "ImporterConfig")]
193    pub follower: bool,
194
195    /// The fake leader imports blocks like a follower, but executes the blocks's txs locally like a leader.
196    #[arg(long = "fake-leader", env = "FAKE_LEADER", conflicts_with_all = ["leader", "follower"], requires = "ImporterConfig")]
197    pub fake_leader: bool,
198
199    #[clap(flatten)]
200    pub rpc_server: RpcServerConfig,
201
202    #[clap(flatten)]
203    pub storage: StorageConfig,
204
205    #[clap(flatten)]
206    pub executor: ExecutorConfig,
207
208    #[clap(flatten)]
209    pub miner: MinerConfig,
210
211    #[deref]
212    #[clap(flatten)]
213    pub common: CommonConfig,
214
215    #[clap(flatten)]
216    pub importer: Option<ImporterConfig>,
217
218    #[clap(flatten)]
219    pub kafka_config: Option<KafkaConfig>,
220}
221
222impl WithCommonConfig for StratusConfig {
223    fn common(&self) -> &CommonConfig {
224        &self.common
225    }
226}
227
228// -----------------------------------------------------------------------------
229// Config: RpcDownloader
230// -----------------------------------------------------------------------------
231
232/// Configuration for `rpc-downlaoder` binary.
233#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)]
234pub struct RpcDownloaderConfig {
235    /// Final block number to be downloaded.
236    #[arg(long = "block-end", env = "BLOCK_END")]
237    pub block_end: Option<u64>,
238
239    #[clap(flatten)]
240    pub rpc_storage: ExternalRpcConfig,
241
242    /// External RPC endpoint to sync blocks with Stratus.
243    #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
244    pub external_rpc: String,
245
246    /// Timeout for blockchain requests
247    #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")]
248    pub external_rpc_timeout: Duration,
249
250    /// Maximum response size in bytes for external RPC requests
251    #[arg(
252        long = "external-rpc-max-response-size-bytes",
253        env = "EXTERNAL_RPC_MAX_RESPONSE_SIZE_BYTES",
254        default_value = "10485760"
255    )]
256    pub external_rpc_max_response_size_bytes: u32,
257
258    /// Number of parallel downloads.
259    #[arg(short = 'p', long = "paralellism", env = "PARALELLISM", default_value = "1")]
260    pub paralellism: usize,
261
262    /// Accounts to retrieve initial balance information.
263    ///
264    /// For Cloudwalk networks, provide these addresses:
265    /// - Mainnet: 0xF56A88A4afF45cdb5ED7Fe63a8b71aEAaFF24FA6
266    /// - Testnet: 0xE45b176cAd7090A5CF70B69a73b6DEF9296ba6A2
267    #[arg(long = "initial-accounts", env = "INITIAL_ACCOUNTS", value_delimiter = ',')]
268    pub initial_accounts: Vec<Address>,
269
270    #[deref]
271    #[clap(flatten)]
272    pub common: CommonConfig,
273}
274
275impl WithCommonConfig for RpcDownloaderConfig {
276    fn common(&self) -> &CommonConfig {
277        &self.common
278    }
279}
280
281// -----------------------------------------------------------------------------
282// Config: ImporterOffline
283// -----------------------------------------------------------------------------
284
285/// Configuration for `importer-offline` binary.
286#[derive(Parser, DebugAsJson, derive_more::Deref, serde::Serialize)]
287pub struct ImporterOfflineConfig {
288    /// Initial block number to be imported.
289    #[arg(long = "block-start", env = "BLOCK_START")]
290    pub block_start: Option<u64>,
291
292    /// Final block number to be imported.
293    #[arg(long = "block-end", env = "BLOCK_END")]
294    pub block_end: Option<u64>,
295
296    /// Number of parallel database fetches.
297    #[arg(short = 'p', long = "paralellism", env = "PARALELLISM", default_value = "1")]
298    pub paralellism: usize,
299
300    /// Number of blocks by database fetch.
301    #[arg(short = 'b', long = "blocks-by-fetch", env = "BLOCKS_BY_FETCH", default_value = "10000")]
302    pub blocks_by_fetch: usize,
303
304    /// Number of blocks to be accumulated before sending to the block saver. (cache needs to be sufficiently big)
305    #[arg(long = "block-saver-batch-size", env = "BLOCK_SAVER_BATCH_SIZE", default_value = "100")]
306    pub block_saver_batch_size: usize,
307
308    /// Number of blocks batches that can be queued to the saver before blocking. (cache needs to be sufficiently big)
309    #[arg(long = "block-saver-queue-size", env = "BLOCK_SAVER_QUEUE_SIZE", default_value = "10")]
310    pub block_saver_queue_size: usize,
311
312    #[clap(flatten)]
313    pub executor: ExecutorConfig,
314
315    #[clap(flatten)]
316    pub miner: MinerConfig,
317
318    #[clap(flatten)]
319    pub storage: StorageConfig,
320
321    #[clap(flatten)]
322    pub rpc_storage: ExternalRpcConfig,
323
324    #[deref]
325    #[clap(flatten)]
326    pub common: CommonConfig,
327}
328
329impl WithCommonConfig for ImporterOfflineConfig {
330    fn common(&self) -> &CommonConfig {
331        &self.common
332    }
333}
334
335// -----------------------------------------------------------------------------
336// Enum: Env
337// -----------------------------------------------------------------------------
338#[derive(DebugAsJson, strum::Display, strum::VariantNames, Clone, Copy, Parser, serde::Serialize)]
339pub enum Environment {
340    #[serde(rename = "local")]
341    #[strum(to_string = "local")]
342    Local,
343
344    #[serde(rename = "staging")]
345    #[strum(to_string = "staging")]
346    Staging,
347
348    #[serde(rename = "production")]
349    #[strum(to_string = "production")]
350    Production,
351
352    #[serde(rename = "canary")]
353    #[strum(to_string = "canary")]
354    Canary,
355}
356
357impl FromStr for Environment {
358    type Err = anyhow::Error;
359
360    fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
361        let s = s.trim().to_lowercase();
362        match s.as_ref() {
363            "local" => Ok(Self::Local),
364            "staging" | "test" => Ok(Self::Staging),
365            "production" | "prod" => Ok(Self::Production),
366            "canary" => Ok(Self::Canary),
367            s => Err(anyhow!("unknown environment: \"{}\" - valid values are {:?}", s, Self::VARIANTS)),
368        }
369    }
370}
371
372/// Genesis configuration
373#[derive(DebugAsJson, Clone, Parser, serde::Serialize, Default)]
374pub struct GenesisFileConfig {
375    /// Path to the genesis.json file
376    #[arg(long = "genesis-path", env = "GENESIS_JSON_PATH")]
377    pub genesis_path: Option<String>,
378}
379
380#[cfg(test)]
381mod tests {
382    use std::env;
383
384    use super::*;
385
386    #[test]
387    fn test_genesis_file_config() {
388        // Test with command line argument
389        let args = vec!["program", "--genesis-path", "/path/to/genesis.json"];
390        let config = GenesisFileConfig::parse_from(args);
391        assert_eq!(config.genesis_path, Some("/path/to/genesis.json".to_string()));
392
393        // Test with environment variable
394        unsafe {
395            env::set_var("GENESIS_JSON_PATH", "/env/path/to/genesis.json");
396        }
397        let args = vec!["program"]; // No command line argument
398        let config = GenesisFileConfig::parse_from(args);
399        assert_eq!(config.genesis_path, Some("/env/path/to/genesis.json".to_string()));
400
401        // Command line argument should take precedence over environment variable
402        let args = vec!["program", "--genesis-path", "/cli/path/to/genesis.json"];
403        let config = GenesisFileConfig::parse_from(args);
404        assert_eq!(config.genesis_path, Some("/cli/path/to/genesis.json".to_string()));
405
406        // Clean up
407        unsafe {
408            env::remove_var("GENESIS_JSON_PATH");
409        }
410    }
411}