stratus/eth/follower/importer/
importer_config.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use clap::Parser;
5use display_json::DebugAsJson;
6use serde_json::json;
7
8use crate::GlobalState;
9use crate::NodeMode;
10use crate::eth::executor::Executor;
11use crate::eth::follower::importer::ImporterMode;
12use crate::eth::follower::importer::importer_supervisor::ImporterConsensus;
13use crate::eth::follower::importer::importer_supervisor::start_importer;
14use crate::eth::miner::Miner;
15use crate::eth::primitives::ConsensusError;
16use crate::eth::primitives::ImporterError;
17use crate::eth::primitives::StateError;
18use crate::eth::primitives::StratusError;
19use crate::eth::rpc::RpcContext;
20use crate::eth::storage::StratusStorage;
21use crate::ext::not;
22use crate::ext::parse_duration;
23use crate::ext::spawn;
24use crate::infra::BlockchainClient;
25use crate::infra::kafka::KafkaConnector;
26
27#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize)]
28#[group(requires_all = ["external_rpc", "follower"])]
29pub struct ImporterConfig {
30 #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", required = false)]
32 pub external_rpc: String,
33
34 #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS", required = false)]
36 pub external_rpc_ws: Option<String>,
37
38 #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s", required = false)]
40 pub external_rpc_timeout: Duration,
41
42 #[arg(
44 long = "external-rpc-max-response-size-bytes",
45 env = "EXTERNAL_RPC_MAX_RESPONSE_SIZE_BYTES",
46 default_value = "10485760"
47 )]
48 pub external_rpc_max_response_size_bytes: u32,
49
50 #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms", required = false)]
51 pub sync_interval: Duration,
52
53 #[arg(long = "enable-block-changes-replication", env = "ENABLE_BLOCK_CHANGES_REPLICATION", default_value = "false")]
55 pub enable_block_changes_replication: bool,
56}
57
58impl ImporterConfig {
59 pub async fn init(
60 &self,
61 executor: Arc<Executor>,
62 miner: Arc<Miner>,
63 storage: Arc<StratusStorage>,
64 kafka_connector: Option<KafkaConnector>,
65 ) -> anyhow::Result<Option<Arc<ImporterConsensus>>> {
66 match GlobalState::get_node_mode() {
67 NodeMode::Leader => Ok(None),
68 NodeMode::Follower =>
69 self.init_follower(
70 executor,
71 miner,
72 storage,
73 kafka_connector,
74 if self.enable_block_changes_replication {
75 ImporterMode::BlockWithChanges
76 } else {
77 ImporterMode::ReexecutionFollower
78 },
79 )
80 .await,
81 NodeMode::FakeLeader => self.init_follower(executor, miner, storage, kafka_connector, ImporterMode::FakeLeader).await,
82 }
83 }
84
85 async fn init_follower(
86 &self,
87 executor: Arc<Executor>,
88 miner: Arc<Miner>,
89 storage: Arc<StratusStorage>,
90 kafka_connector: Option<KafkaConnector>,
91 importer_mode: ImporterMode,
92 ) -> anyhow::Result<Option<Arc<ImporterConsensus>>> {
93 const TASK_NAME: &str = "importer::init";
94 tracing::info!("creating importer for follower node");
95 let chain = Arc::new(
96 BlockchainClient::new_http_ws(
97 &self.external_rpc,
98 self.external_rpc_ws.as_deref(),
99 self.external_rpc_timeout,
100 self.external_rpc_max_response_size_bytes,
101 )
102 .await?,
103 );
104
105 let consensus = Arc::new(ImporterConsensus {
106 storage: Arc::clone(&storage),
107 chain: Arc::clone(&chain),
108 });
109
110 spawn(
111 TASK_NAME,
112 start_importer(importer_mode, storage, executor, miner, chain, kafka_connector, self.sync_interval),
113 );
114
115 Ok(Some(consensus))
116 }
117
118 pub async fn init_follower_importer(&self, ctx: Arc<RpcContext>) -> Result<serde_json::Value, StratusError> {
119 if GlobalState::get_node_mode() != NodeMode::Follower {
120 tracing::error!("node is currently not a follower");
121 return Err(StateError::StratusNotFollower.into());
122 }
123
124 if not(GlobalState::is_importer_shutdown()) {
125 tracing::error!("importer is already running");
126 return Err(ImporterError::AlreadyRunning.into());
127 }
128
129 GlobalState::set_importer_shutdown(false);
130
131 let consensus = match self
132 .init(
133 Arc::clone(&ctx.server.executor),
134 Arc::clone(&ctx.server.miner),
135 Arc::clone(&ctx.server.storage),
136 None,
137 )
138 .await
139 {
140 Ok(consensus) => consensus,
141 Err(e) => {
142 tracing::error!(reason = ?e, "failed to initialize importer");
143 GlobalState::set_importer_shutdown(true);
144 return Err(ImporterError::InitError.into());
145 }
146 };
147
148 match consensus {
149 Some(consensus) => {
150 ctx.server.set_importer(Some(consensus));
151 }
152 None => {
153 tracing::error!("failed to update consensus: Consensus is not set.");
154 GlobalState::set_importer_shutdown(true);
155 return Err(ConsensusError::NotSet.into());
156 }
157 }
158
159 Ok(json!(true))
160 }
161}