stratus/eth/follower/importer/
importer_config.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use clap::Parser;
5use display_json::DebugAsJson;
6use serde_json::json;
7
8use super::importer::ImporterMode;
9use crate::GlobalState;
10use crate::NodeMode;
11use crate::eth::executor::Executor;
12use crate::eth::follower::importer::Importer;
13use crate::eth::miner::Miner;
14use crate::eth::primitives::ConsensusError;
15use crate::eth::primitives::ImporterError;
16use crate::eth::primitives::StateError;
17use crate::eth::primitives::StratusError;
18use crate::eth::rpc::RpcContext;
19use crate::eth::storage::StratusStorage;
20use crate::ext::not;
21use crate::ext::parse_duration;
22use crate::ext::spawn;
23use crate::infra::BlockchainClient;
24use crate::infra::kafka::KafkaConnector;
25
26#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize)]
27#[group(requires_all = ["external_rpc", "follower"])]
28pub struct ImporterConfig {
29 #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", required = false)]
31 pub external_rpc: String,
32
33 #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS", required = false)]
35 pub external_rpc_ws: Option<String>,
36
37 #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s", required = false)]
39 pub external_rpc_timeout: Duration,
40
41 #[arg(
43 long = "external-rpc-max-response-size-bytes",
44 env = "EXTERNAL_RPC_MAX_RESPONSE_SIZE_BYTES",
45 default_value = "10485760"
46 )]
47 pub external_rpc_max_response_size_bytes: u32,
48
49 #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms", required = false)]
50 pub sync_interval: Duration,
51
52 #[arg(long = "enable-block-changes-replication", env = "ENABLE_BLOCK_CHANGES_REPLICATION", default_value = "false")]
54 pub enable_block_changes_replication: bool,
55}
56
57impl ImporterConfig {
58 pub async fn init(
59 &self,
60 executor: Arc<Executor>,
61 miner: Arc<Miner>,
62 storage: Arc<StratusStorage>,
63 kafka_connector: Option<KafkaConnector>,
64 ) -> anyhow::Result<Option<Arc<Importer>>> {
65 match GlobalState::get_node_mode() {
66 NodeMode::Leader => Ok(None),
67 NodeMode::Follower =>
68 self.init_follower(
69 executor,
70 miner,
71 storage,
72 kafka_connector,
73 if self.enable_block_changes_replication {
74 ImporterMode::BlockWithChanges
75 } else {
76 ImporterMode::NormalFollower
77 },
78 )
79 .await,
80 NodeMode::FakeLeader => self.init_follower(executor, miner, storage, kafka_connector, ImporterMode::FakeLeader).await,
81 }
82 }
83
84 async fn init_follower(
85 &self,
86 executor: Arc<Executor>,
87 miner: Arc<Miner>,
88 storage: Arc<StratusStorage>,
89 kafka_connector: Option<KafkaConnector>,
90 importer_mode: ImporterMode,
91 ) -> anyhow::Result<Option<Arc<Importer>>> {
92 const TASK_NAME: &str = "importer::init";
93 tracing::info!("creating importer for follower node");
94
95 let chain = Arc::new(
96 BlockchainClient::new_http_ws(
97 &self.external_rpc,
98 self.external_rpc_ws.as_deref(),
99 self.external_rpc_timeout,
100 self.external_rpc_max_response_size_bytes,
101 )
102 .await?,
103 );
104
105 let importer = Importer::new(
106 executor,
107 Arc::clone(&miner),
108 Arc::clone(&storage),
109 Arc::clone(&chain),
110 kafka_connector.map(Arc::new),
111 self.sync_interval,
112 importer_mode,
113 );
114 let importer = Arc::new(importer);
115
116 spawn(TASK_NAME, {
117 let importer = Arc::clone(&importer);
118 async move {
119 if let Err(e) = importer.run_importer_online().await {
120 tracing::error!(reason = ?e, "importer-online failed");
121 }
122 }
123 });
124
125 Ok(Some(importer))
126 }
127
128 pub async fn init_follower_importer(&self, ctx: Arc<RpcContext>) -> Result<serde_json::Value, StratusError> {
129 if GlobalState::get_node_mode() != NodeMode::Follower {
130 tracing::error!("node is currently not a follower");
131 return Err(StateError::StratusNotFollower.into());
132 }
133
134 if not(GlobalState::is_importer_shutdown()) {
135 tracing::error!("importer is already running");
136 return Err(ImporterError::AlreadyRunning.into());
137 }
138
139 GlobalState::set_importer_shutdown(false);
140
141 let consensus = match self
142 .init(
143 Arc::clone(&ctx.server.executor),
144 Arc::clone(&ctx.server.miner),
145 Arc::clone(&ctx.server.storage),
146 None,
147 )
148 .await
149 {
150 Ok(consensus) => consensus,
151 Err(e) => {
152 tracing::error!(reason = ?e, "failed to initialize importer");
153 GlobalState::set_importer_shutdown(true);
154 return Err(ImporterError::InitError.into());
155 }
156 };
157
158 match consensus {
159 Some(consensus) => {
160 ctx.server.set_importer(Some(consensus));
161 }
162 None => {
163 tracing::error!("failed to update consensus: Consensus is not set.");
164 GlobalState::set_importer_shutdown(true);
165 return Err(ConsensusError::NotSet.into());
166 }
167 }
168
169 Ok(json!(true))
170 }
171}