stratus/eth/follower/importer/
importer_config.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use clap::Parser;
5use display_json::DebugAsJson;
6use serde_json::json;
7
8use crate::GlobalState;
9use crate::NodeMode;
10use crate::eth::executor::Executor;
11use crate::eth::follower::importer::ImporterMode;
12use crate::eth::follower::importer::importer_supervisor::ImporterConsensus;
13use crate::eth::follower::importer::importer_supervisor::start_importer;
14use crate::eth::miner::Miner;
15use crate::eth::primitives::ConsensusError;
16use crate::eth::primitives::ImporterError;
17use crate::eth::primitives::StateError;
18use crate::eth::primitives::StratusError;
19use crate::eth::rpc::RpcContext;
20use crate::eth::storage::StratusStorage;
21use crate::ext::not;
22use crate::ext::parse_duration;
23use crate::ext::spawn;
24use crate::infra::BlockchainClient;
25use crate::infra::kafka::KafkaConnector;
26
27#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize)]
28#[group(requires_all = ["external_rpc", "follower"])]
29pub struct ImporterConfig {
30    /// External RPC HTTP endpoint to sync blocks with Stratus.
31    #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", required = false)]
32    pub external_rpc: String,
33
34    /// External RPC WS endpoint to sync blocks with Stratus.
35    #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS", required = false)]
36    pub external_rpc_ws: Option<String>,
37
38    /// Timeout for blockchain requests (importer online)
39    #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s", required = false)]
40    pub external_rpc_timeout: Duration,
41
42    /// Maximum response size in bytes for external RPC requests
43    #[arg(
44        long = "external-rpc-max-response-size-bytes",
45        env = "EXTERNAL_RPC_MAX_RESPONSE_SIZE_BYTES",
46        default_value = "10485760"
47    )]
48    pub external_rpc_max_response_size_bytes: u32,
49
50    #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms", required = false)]
51    pub sync_interval: Duration,
52
53    /// Enable replication of block changes
54    #[arg(long = "enable-block-changes-replication", env = "ENABLE_BLOCK_CHANGES_REPLICATION", default_value = "false")]
55    pub enable_block_changes_replication: bool,
56}
57
58impl ImporterConfig {
59    pub async fn init(
60        &self,
61        executor: Arc<Executor>,
62        miner: Arc<Miner>,
63        storage: Arc<StratusStorage>,
64        kafka_connector: Option<KafkaConnector>,
65    ) -> anyhow::Result<Option<Arc<ImporterConsensus>>> {
66        match GlobalState::get_node_mode() {
67            NodeMode::Leader => Ok(None),
68            NodeMode::Follower =>
69                self.init_follower(
70                    executor,
71                    miner,
72                    storage,
73                    kafka_connector,
74                    if self.enable_block_changes_replication {
75                        ImporterMode::BlockWithChanges
76                    } else {
77                        ImporterMode::ReexecutionFollower
78                    },
79                )
80                .await,
81            NodeMode::FakeLeader => self.init_follower(executor, miner, storage, kafka_connector, ImporterMode::FakeLeader).await,
82        }
83    }
84
85    async fn init_follower(
86        &self,
87        executor: Arc<Executor>,
88        miner: Arc<Miner>,
89        storage: Arc<StratusStorage>,
90        kafka_connector: Option<KafkaConnector>,
91        importer_mode: ImporterMode,
92    ) -> anyhow::Result<Option<Arc<ImporterConsensus>>> {
93        const TASK_NAME: &str = "importer::init";
94        tracing::info!("creating importer for follower node");
95        let chain = Arc::new(
96            BlockchainClient::new_http_ws(
97                &self.external_rpc,
98                self.external_rpc_ws.as_deref(),
99                self.external_rpc_timeout,
100                self.external_rpc_max_response_size_bytes,
101            )
102            .await?,
103        );
104
105        let consensus = Arc::new(ImporterConsensus {
106            storage: Arc::clone(&storage),
107            chain: Arc::clone(&chain),
108        });
109
110        spawn(
111            TASK_NAME,
112            start_importer(importer_mode, storage, executor, miner, chain, kafka_connector, self.sync_interval),
113        );
114
115        Ok(Some(consensus))
116    }
117
118    pub async fn init_follower_importer(&self, ctx: Arc<RpcContext>) -> Result<serde_json::Value, StratusError> {
119        if GlobalState::get_node_mode() != NodeMode::Follower {
120            tracing::error!("node is currently not a follower");
121            return Err(StateError::StratusNotFollower.into());
122        }
123
124        if not(GlobalState::is_importer_shutdown()) {
125            tracing::error!("importer is already running");
126            return Err(ImporterError::AlreadyRunning.into());
127        }
128
129        GlobalState::set_importer_shutdown(false);
130
131        let consensus = match self
132            .init(
133                Arc::clone(&ctx.server.executor),
134                Arc::clone(&ctx.server.miner),
135                Arc::clone(&ctx.server.storage),
136                None,
137            )
138            .await
139        {
140            Ok(consensus) => consensus,
141            Err(e) => {
142                tracing::error!(reason = ?e, "failed to initialize importer");
143                GlobalState::set_importer_shutdown(true);
144                return Err(ImporterError::InitError.into());
145            }
146        };
147
148        match consensus {
149            Some(consensus) => {
150                ctx.server.set_importer(Some(consensus));
151            }
152            None => {
153                tracing::error!("failed to update consensus: Consensus is not set.");
154                GlobalState::set_importer_shutdown(true);
155                return Err(ConsensusError::NotSet.into());
156            }
157        }
158
159        Ok(json!(true))
160    }
161}