stratus/eth/follower/importer/
importer_config.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use clap::Parser;
5use display_json::DebugAsJson;
6use serde_json::json;
7
8use super::importer::ImporterMode;
9use crate::GlobalState;
10use crate::NodeMode;
11use crate::eth::executor::Executor;
12use crate::eth::follower::importer::Importer;
13use crate::eth::miner::Miner;
14use crate::eth::primitives::ConsensusError;
15use crate::eth::primitives::ImporterError;
16use crate::eth::primitives::StateError;
17use crate::eth::primitives::StratusError;
18use crate::eth::rpc::RpcContext;
19use crate::eth::storage::StratusStorage;
20use crate::ext::not;
21use crate::ext::parse_duration;
22use crate::ext::spawn;
23use crate::infra::BlockchainClient;
24use crate::infra::kafka::KafkaConnector;
25
26#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize)]
27#[group(requires_all = ["external_rpc", "follower"])]
28pub struct ImporterConfig {
29    /// External RPC HTTP endpoint to sync blocks with Stratus.
30    #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", required = false)]
31    pub external_rpc: String,
32
33    /// External RPC WS endpoint to sync blocks with Stratus.
34    #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS", required = false)]
35    pub external_rpc_ws: Option<String>,
36
37    /// Timeout for blockchain requests (importer online)
38    #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s", required = false)]
39    pub external_rpc_timeout: Duration,
40
41    /// Maximum response size in bytes for external RPC requests
42    #[arg(
43        long = "external-rpc-max-response-size-bytes",
44        env = "EXTERNAL_RPC_MAX_RESPONSE_SIZE_BYTES",
45        default_value = "10485760"
46    )]
47    pub external_rpc_max_response_size_bytes: u32,
48
49    #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms", required = false)]
50    pub sync_interval: Duration,
51
52    /// Enable replication of block changes
53    #[arg(long = "enable-block-changes-replication", env = "ENABLE_BLOCK_CHANGES_REPLICATION", default_value = "false")]
54    pub enable_block_changes_replication: bool,
55}
56
57impl ImporterConfig {
58    pub async fn init(
59        &self,
60        executor: Arc<Executor>,
61        miner: Arc<Miner>,
62        storage: Arc<StratusStorage>,
63        kafka_connector: Option<KafkaConnector>,
64    ) -> anyhow::Result<Option<Arc<Importer>>> {
65        match GlobalState::get_node_mode() {
66            NodeMode::Leader => Ok(None),
67            NodeMode::Follower =>
68                self.init_follower(
69                    executor,
70                    miner,
71                    storage,
72                    kafka_connector,
73                    if self.enable_block_changes_replication {
74                        ImporterMode::BlockWithChanges
75                    } else {
76                        ImporterMode::NormalFollower
77                    },
78                )
79                .await,
80            NodeMode::FakeLeader => self.init_follower(executor, miner, storage, kafka_connector, ImporterMode::FakeLeader).await,
81        }
82    }
83
84    async fn init_follower(
85        &self,
86        executor: Arc<Executor>,
87        miner: Arc<Miner>,
88        storage: Arc<StratusStorage>,
89        kafka_connector: Option<KafkaConnector>,
90        importer_mode: ImporterMode,
91    ) -> anyhow::Result<Option<Arc<Importer>>> {
92        const TASK_NAME: &str = "importer::init";
93        tracing::info!("creating importer for follower node");
94
95        let chain = Arc::new(
96            BlockchainClient::new_http_ws(
97                &self.external_rpc,
98                self.external_rpc_ws.as_deref(),
99                self.external_rpc_timeout,
100                self.external_rpc_max_response_size_bytes,
101            )
102            .await?,
103        );
104
105        let importer = Importer::new(
106            executor,
107            Arc::clone(&miner),
108            Arc::clone(&storage),
109            Arc::clone(&chain),
110            kafka_connector.map(Arc::new),
111            self.sync_interval,
112            importer_mode,
113        );
114        let importer = Arc::new(importer);
115
116        spawn(TASK_NAME, {
117            let importer = Arc::clone(&importer);
118            async move {
119                if let Err(e) = importer.run_importer_online().await {
120                    tracing::error!(reason = ?e, "importer-online failed");
121                }
122            }
123        });
124
125        Ok(Some(importer))
126    }
127
128    pub async fn init_follower_importer(&self, ctx: Arc<RpcContext>) -> Result<serde_json::Value, StratusError> {
129        if GlobalState::get_node_mode() != NodeMode::Follower {
130            tracing::error!("node is currently not a follower");
131            return Err(StateError::StratusNotFollower.into());
132        }
133
134        if not(GlobalState::is_importer_shutdown()) {
135            tracing::error!("importer is already running");
136            return Err(ImporterError::AlreadyRunning.into());
137        }
138
139        GlobalState::set_importer_shutdown(false);
140
141        let consensus = match self
142            .init(
143                Arc::clone(&ctx.server.executor),
144                Arc::clone(&ctx.server.miner),
145                Arc::clone(&ctx.server.storage),
146                None,
147            )
148            .await
149        {
150            Ok(consensus) => consensus,
151            Err(e) => {
152                tracing::error!(reason = ?e, "failed to initialize importer");
153                GlobalState::set_importer_shutdown(true);
154                return Err(ImporterError::InitError.into());
155            }
156        };
157
158        match consensus {
159            Some(consensus) => {
160                ctx.server.set_importer(Some(consensus));
161            }
162            None => {
163                tracing::error!("failed to update consensus: Consensus is not set.");
164                GlobalState::set_importer_shutdown(true);
165                return Err(ConsensusError::NotSet.into());
166            }
167        }
168
169        Ok(json!(true))
170    }
171}