1use std::env;
4use std::str::FromStr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::atomic::Ordering;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use clap::ArgGroup;
11use clap::Parser;
12use display_json::DebugAsJson;
13use strum::VariantNames;
14use tokio::runtime::Builder;
15use tokio::runtime::Runtime;
16
17use crate::eth::executor::ExecutorConfig;
18use crate::eth::external_rpc::ExternalRpcConfig;
19use crate::eth::follower::importer::ImporterConfig;
20use crate::eth::miner::MinerConfig;
21use crate::eth::primitives::Address;
22use crate::eth::rpc::RpcServerConfig;
23use crate::eth::storage::StorageConfig;
24use crate::ext::parse_duration;
25use crate::infra::build_info;
26use crate::infra::kafka::KafkaConfig;
27use crate::infra::metrics::MetricsConfig;
28use crate::infra::sentry::SentryConfig;
29use crate::infra::tracing::TracingConfig;
30
31pub fn load_dotenv_file() {
33 let env = match std::env::var("ENV") {
35 Ok(env) => Environment::from_str(env.as_str()),
36 Err(_) => Ok(Environment::Local),
37 };
38
39 let env_filename = match env {
41 Ok(Environment::Local) => {
42 match std::env::var("LOCAL_ENV_PATH") {
44 Ok(local_path) => local_path,
45 Err(_) => format!("config/{}.env.local", build_info::binary_name()),
46 }
47 }
48 Ok(env) => format!("config/{}.env.{}", build_info::binary_name(), env),
49 Err(e) => {
50 println!("{e}");
51 return;
52 }
53 };
54
55 println!("reading env file | filename={env_filename}");
56
57 if let Err(e) = dotenvy::from_filename(env_filename) {
58 println!("env file error: {e}");
59 }
60}
61
62pub fn load_env_aliases() {
64 fn env_alias(canonical: &'static str, alias: &'static str) {
65 if let Ok(value) = env::var(alias) {
66 unsafe {
67 env::set_var(canonical, value);
68 }
69 }
70 }
71 env_alias("EXECUTOR_CHAIN_ID", "CHAIN_ID");
72 env_alias("EXECUTOR_EVMS", "EVMS");
73 env_alias("EXECUTOR_EVMS", "NUM_EVMS");
74 env_alias("EXECUTOR_REJECT_NOT_CONTRACT", "REJECT_NOT_CONTRACT");
75 env_alias("EXECUTOR_STRATEGY", "STRATEGY");
76 env_alias("TRACING_LOG_FORMAT", "LOG_FORMAT");
77 env_alias("TRACING_URL", "TRACING_COLLECTOR_URL");
78}
79
80pub trait WithCommonConfig {
85 fn common(&self) -> &CommonConfig;
86}
87
88#[derive(DebugAsJson, Clone, Parser, serde::Serialize)]
90#[command(author, version, about, long_about = None)]
91pub struct CommonConfig {
92 #[arg(long = "env", env = "ENV", default_value = "local")]
94 pub env: Environment,
95
96 #[arg(long = "async-threads", env = "ASYNC_THREADS", default_value = "32")]
98 pub num_async_threads: usize,
99
100 #[arg(long = "blocking-threads", env = "BLOCKING_THREADS", default_value = "512")]
102 pub num_blocking_threads: usize,
103
104 #[clap(flatten)]
105 pub tracing: TracingConfig,
106
107 #[clap(flatten)]
108 pub sentry: Option<SentryConfig>,
109
110 #[clap(flatten)]
111 pub metrics: MetricsConfig,
112
113 #[arg(long = "nocapture")]
115 pub nocapture: bool,
116
117 #[arg(long = "unknown-client-enabled", env = "UNKNOWN_CLIENT_ENABLED", default_value = "true")]
119 pub unknown_client_enabled: bool,
120}
121
122impl WithCommonConfig for CommonConfig {
123 fn common(&self) -> &CommonConfig {
124 self
125 }
126}
127
128impl CommonConfig {
129 pub fn init_tokio_runtime(&self) -> anyhow::Result<Runtime> {
131 println!(
132 "creating tokio runtime | async_threads={} blocking_threads={}",
133 self.num_async_threads, self.num_blocking_threads
134 );
135
136 let num_async_threads = self.num_async_threads;
137 let num_blocking_threads = self.num_blocking_threads;
138 let result = Builder::new_multi_thread()
139 .enable_all()
140 .worker_threads(num_async_threads)
141 .max_blocking_threads(num_blocking_threads)
142 .thread_keep_alive(Duration::from_secs(u64::MAX))
143 .thread_name_fn(move || {
144 static ASYNC_ID: AtomicUsize = AtomicUsize::new(1);
149 static BLOCKING_ID: AtomicUsize = AtomicUsize::new(1);
150
151 let async_id = ASYNC_ID.fetch_add(1, Ordering::SeqCst);
153 if async_id <= num_async_threads {
154 if cfg!(feature = "flamegraph") {
155 return "tokio-async".to_string();
156 } else {
157 return format!("tokio-async-{async_id}");
158 }
159 }
160
161 let blocking_id = BLOCKING_ID.fetch_add(1, Ordering::SeqCst);
163 if cfg!(feature = "flamegraph") {
164 "tokio-blocking".to_string()
165 } else {
166 format!("tokio-blocking-{blocking_id}")
167 }
168 })
169 .build();
170
171 match result {
172 Ok(runtime) => Ok(runtime),
173 Err(e) => {
174 println!("failed to create tokio runtime | reason={e:?}");
175 Err(e.into())
176 }
177 }
178 }
179}
180
181#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)]
187#[clap(group = ArgGroup::new("mode").required(true).args(&["leader", "follower", "fake_leader"]))]
188pub struct StratusConfig {
189 #[arg(long = "leader", env = "LEADER", conflicts_with_all = ["follower", "fake_leader", "ImporterConfig"])]
190 pub leader: bool,
191
192 #[arg(long = "follower", env = "FOLLOWER", conflicts_with_all = ["leader", "fake_leader"], requires = "ImporterConfig")]
193 pub follower: bool,
194
195 #[arg(long = "fake-leader", env = "FAKE_LEADER", conflicts_with_all = ["leader", "follower"], requires = "ImporterConfig")]
197 pub fake_leader: bool,
198
199 #[clap(flatten)]
200 pub rpc_server: RpcServerConfig,
201
202 #[clap(flatten)]
203 pub storage: StorageConfig,
204
205 #[clap(flatten)]
206 pub executor: ExecutorConfig,
207
208 #[clap(flatten)]
209 pub miner: MinerConfig,
210
211 #[deref]
212 #[clap(flatten)]
213 pub common: CommonConfig,
214
215 #[clap(flatten)]
216 pub importer: Option<ImporterConfig>,
217
218 #[clap(flatten)]
219 pub kafka_config: Option<KafkaConfig>,
220}
221
222impl WithCommonConfig for StratusConfig {
223 fn common(&self) -> &CommonConfig {
224 &self.common
225 }
226}
227
228#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)]
234pub struct RpcDownloaderConfig {
235 #[arg(long = "block-end", env = "BLOCK_END")]
237 pub block_end: Option<u64>,
238
239 #[clap(flatten)]
240 pub rpc_storage: ExternalRpcConfig,
241
242 #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
244 pub external_rpc: String,
245
246 #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")]
248 pub external_rpc_timeout: Duration,
249
250 #[arg(
252 long = "external-rpc-max-response-size-bytes",
253 env = "EXTERNAL_RPC_MAX_RESPONSE_SIZE_BYTES",
254 default_value = "10485760"
255 )]
256 pub external_rpc_max_response_size_bytes: u32,
257
258 #[arg(short = 'p', long = "paralellism", env = "PARALELLISM", default_value = "1")]
260 pub paralellism: usize,
261
262 #[arg(long = "initial-accounts", env = "INITIAL_ACCOUNTS", value_delimiter = ',')]
268 pub initial_accounts: Vec<Address>,
269
270 #[deref]
271 #[clap(flatten)]
272 pub common: CommonConfig,
273}
274
275impl WithCommonConfig for RpcDownloaderConfig {
276 fn common(&self) -> &CommonConfig {
277 &self.common
278 }
279}
280
281#[derive(Parser, DebugAsJson, derive_more::Deref, serde::Serialize)]
287pub struct ImporterOfflineConfig {
288 #[arg(long = "block-start", env = "BLOCK_START")]
290 pub block_start: Option<u64>,
291
292 #[arg(long = "block-end", env = "BLOCK_END")]
294 pub block_end: Option<u64>,
295
296 #[arg(short = 'p', long = "paralellism", env = "PARALELLISM", default_value = "1")]
298 pub paralellism: usize,
299
300 #[arg(short = 'b', long = "blocks-by-fetch", env = "BLOCKS_BY_FETCH", default_value = "10000")]
302 pub blocks_by_fetch: usize,
303
304 #[arg(long = "block-saver-batch-size", env = "BLOCK_SAVER_BATCH_SIZE", default_value = "100")]
306 pub block_saver_batch_size: usize,
307
308 #[arg(long = "block-saver-queue-size", env = "BLOCK_SAVER_QUEUE_SIZE", default_value = "10")]
310 pub block_saver_queue_size: usize,
311
312 #[clap(flatten)]
313 pub executor: ExecutorConfig,
314
315 #[clap(flatten)]
316 pub miner: MinerConfig,
317
318 #[clap(flatten)]
319 pub storage: StorageConfig,
320
321 #[clap(flatten)]
322 pub rpc_storage: ExternalRpcConfig,
323
324 #[deref]
325 #[clap(flatten)]
326 pub common: CommonConfig,
327}
328
329impl WithCommonConfig for ImporterOfflineConfig {
330 fn common(&self) -> &CommonConfig {
331 &self.common
332 }
333}
334
335#[derive(DebugAsJson, strum::Display, strum::VariantNames, Clone, Copy, Parser, serde::Serialize)]
339pub enum Environment {
340 #[serde(rename = "local")]
341 #[strum(to_string = "local")]
342 Local,
343
344 #[serde(rename = "staging")]
345 #[strum(to_string = "staging")]
346 Staging,
347
348 #[serde(rename = "production")]
349 #[strum(to_string = "production")]
350 Production,
351
352 #[serde(rename = "canary")]
353 #[strum(to_string = "canary")]
354 Canary,
355}
356
357impl FromStr for Environment {
358 type Err = anyhow::Error;
359
360 fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
361 let s = s.trim().to_lowercase();
362 match s.as_ref() {
363 "local" => Ok(Self::Local),
364 "staging" | "test" => Ok(Self::Staging),
365 "production" | "prod" => Ok(Self::Production),
366 "canary" => Ok(Self::Canary),
367 s => Err(anyhow!("unknown environment: \"{}\" - valid values are {:?}", s, Self::VARIANTS)),
368 }
369 }
370}
371
372#[derive(DebugAsJson, Clone, Parser, serde::Serialize, Default)]
374pub struct GenesisFileConfig {
375 #[arg(long = "genesis-path", env = "GENESIS_JSON_PATH")]
377 pub genesis_path: Option<String>,
378}
379
380#[cfg(test)]
381mod tests {
382 use std::env;
383
384 use super::*;
385
386 #[test]
387 fn test_genesis_file_config() {
388 let args = vec!["program", "--genesis-path", "/path/to/genesis.json"];
390 let config = GenesisFileConfig::parse_from(args);
391 assert_eq!(config.genesis_path, Some("/path/to/genesis.json".to_string()));
392
393 unsafe {
395 env::set_var("GENESIS_JSON_PATH", "/env/path/to/genesis.json");
396 }
397 let args = vec!["program"]; let config = GenesisFileConfig::parse_from(args);
399 assert_eq!(config.genesis_path, Some("/env/path/to/genesis.json".to_string()));
400
401 let args = vec!["program", "--genesis-path", "/cli/path/to/genesis.json"];
403 let config = GenesisFileConfig::parse_from(args);
404 assert_eq!(config.genesis_path, Some("/cli/path/to/genesis.json".to_string()));
405
406 unsafe {
408 env::remove_var("GENESIS_JSON_PATH");
409 }
410 }
411}