stratus/infra/kafka/
kafka.rs

1use anyhow::Result;
2use clap::Parser;
3use clap::ValueEnum;
4use display_json::DebugAsJson;
5use futures::Stream;
6use futures::StreamExt;
7use rdkafka::ClientConfig;
8use rdkafka::message::Header;
9use rdkafka::message::OwnedHeaders;
10use rdkafka::producer::DeliveryFuture;
11use rdkafka::producer::FutureProducer;
12use rdkafka::producer::FutureRecord;
13use rdkafka::producer::future_producer::OwnedDeliveryResult;
14
15use crate::infra::metrics;
16use crate::ledger::events::Event;
17use crate::log_and_err;
18
19#[derive(Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize, Default)]
20#[group(requires_all = ["bootstrap_servers", "topic", "client_id", "ImporterConfig"])]
21pub struct KafkaConfig {
22    #[arg(long = "kafka-bootstrap-servers", env = "KAFKA_BOOTSTRAP_SERVERS", required = false)]
23    pub bootstrap_servers: String,
24
25    #[arg(long = "kafka-topic", env = "KAFKA_TOPIC", group = "kafka", required = false)]
26    pub topic: String,
27
28    #[arg(long = "kafka-client-id", env = "KAFKA_CLIENT_ID", required = false)]
29    pub client_id: String,
30
31    #[arg(long = "kafka-group-id", env = "KAFKA_GROUP_ID", required = false)]
32    pub group_id: Option<String>,
33
34    #[arg(long = "kafka-security-protocol", env = "KAFKA_SECURITY_PROTOCOL", required = false, default_value_t)]
35    pub security_protocol: KafkaSecurityProtocol,
36
37    #[arg(long = "kafka-sasl-mechanisms", env = "KAFKA_SASL_MECHANISMS", required = false)]
38    pub sasl_mechanisms: Option<String>,
39
40    #[arg(long = "kafka-sasl-username", env = "KAFKA_SASL_USERNAME", required = false)]
41    pub sasl_username: Option<String>,
42
43    #[arg(long = "kafka-sasl-password", env = "KAFKA_SASL_PASSWORD", required = false)]
44    pub sasl_password: Option<String>,
45
46    #[arg(long = "kafka-ssl-ca-location", env = "KAFKA_SSL_CA_LOCATION", required = false)]
47    pub ssl_ca_location: Option<String>,
48
49    #[arg(long = "kafka-ssl-certificate-location", env = "KAFKA_SSL_CERTIFICATE_LOCATION", required = false)]
50    pub ssl_certificate_location: Option<String>,
51
52    #[arg(long = "kafka-ssl-key-location", env = "KAFKA_SSL_KEY_LOCATION", required = false)]
53    pub ssl_key_location: Option<String>,
54}
55
56impl KafkaConfig {
57    pub fn init(&self) -> Result<KafkaConnector> {
58        KafkaConnector::new(self)
59    }
60}
61
62#[derive(Clone)]
63pub struct KafkaConnector {
64    producer: FutureProducer,
65    topic: String,
66}
67
68#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, ValueEnum, Default)]
69pub enum KafkaSecurityProtocol {
70    #[default]
71    None,
72    SaslSsl,
73    Ssl,
74}
75
76impl std::fmt::Display for KafkaSecurityProtocol {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        match self {
79            KafkaSecurityProtocol::None => write!(f, "none"),
80            KafkaSecurityProtocol::SaslSsl => write!(f, "sasl_ssl"),
81            KafkaSecurityProtocol::Ssl => write!(f, "ssl"),
82        }
83    }
84}
85
86impl KafkaConnector {
87    #[allow(clippy::expect_used)]
88    pub fn new(config: &KafkaConfig) -> Result<Self> {
89        tracing::info!(
90            topic = %config.topic,
91            bootstrap_servers = %config.bootstrap_servers,
92            client_id = %config.client_id,
93            "Creating Kafka connector"
94        );
95
96        let security_protocol = config.security_protocol;
97        let mut client_config = ClientConfig::new()
98            .set("bootstrap.servers", &config.bootstrap_servers)
99            .set("client.id", &config.client_id)
100            .set("linger.ms", "5")
101            .set("batch.size", "1048576") // 1 MB
102            .to_owned();
103
104        let producer = match security_protocol {
105            KafkaSecurityProtocol::None => client_config.create()?,
106            KafkaSecurityProtocol::SaslSsl => client_config
107                .set("security.protocol", "SASL_SSL")
108                .set(
109                    "sasl.mechanisms",
110                    config.sasl_mechanisms.as_ref().expect("sasl mechanisms is required").as_str(),
111                )
112                .set("sasl.username", config.sasl_username.as_ref().expect("sasl username is required").as_str())
113                .set("sasl.password", config.sasl_password.as_ref().expect("sasl password is required").as_str())
114                .create()?,
115            KafkaSecurityProtocol::Ssl => client_config
116                .set(
117                    "ssl.ca.location",
118                    config.ssl_ca_location.as_ref().expect("ssl ca location is required").as_str(),
119                )
120                .set(
121                    "ssl.certificate.location",
122                    config.ssl_certificate_location.as_ref().expect("ssl certificate location is required").as_str(),
123                )
124                .set(
125                    "ssl.key.location",
126                    config.ssl_key_location.as_ref().expect("ssl key location is required").as_str(),
127                )
128                .create()?,
129        };
130
131        Ok(Self {
132            producer,
133            topic: config.topic.clone(),
134        })
135    }
136
137    pub fn queue_event<T: Event>(&self, event: T) -> Result<DeliveryFuture> {
138        tracing::debug!(?event, "queueing event");
139
140        // prepare base payload
141        let headers = event.event_headers()?;
142        let key = event.event_key()?;
143        let payload = event.event_payload()?;
144
145        // prepare kafka payload
146        let mut kafka_headers = OwnedHeaders::new_with_capacity(headers.len());
147        for (key, value) in headers.iter() {
148            let header = Header { key, value: Some(value) };
149            kafka_headers = kafka_headers.insert(header);
150        }
151        let kafka_record = FutureRecord::to(&self.topic).payload(&payload).key(&key).headers(kafka_headers);
152
153        // publis and handle response
154        tracing::info!(%key, %payload, ?headers, "publishing kafka event");
155        match self.producer.send_result(kafka_record) {
156            Err((e, _)) => log_and_err!(reason = e, "failed to queue kafka event"),
157            Ok(fut) => Ok(fut),
158        }
159    }
160
161    pub async fn send_event<T: Event>(&self, event: T) -> Result<()> {
162        tracing::debug!(?event, "sending event");
163        handle_delivery_result(self.queue_event(event)?.await)
164    }
165
166    pub fn create_buffer<T, I>(&self, events: I, buffer_size: usize) -> Result<impl Stream<Item = Result<()>>>
167    where
168        T: Event,
169        I: IntoIterator<Item = T>,
170    {
171        #[cfg(feature = "metrics")]
172        let start = metrics::now();
173
174        let futures: Vec<DeliveryFuture> = events
175            .into_iter()
176            .map(|event| {
177                metrics::timed(|| self.queue_event(event)).with(|m| {
178                    metrics::inc_kafka_queue_event(m.elapsed);
179                })
180            })
181            .collect::<Result<Vec<_>, _>>()?; // This could fail because the queue is full (?)
182
183        #[cfg(feature = "metrics")]
184        metrics::inc_kafka_create_buffer(start.elapsed());
185
186        Ok(futures::stream::iter(futures).buffered(buffer_size).map(handle_delivery_result))
187    }
188
189    pub async fn send_buffered<T, I>(&self, events: I, buffer_size: usize) -> Result<()>
190    where
191        T: Event,
192        I: IntoIterator<Item = T>,
193    {
194        #[cfg(feature = "metrics")]
195        let start = metrics::now();
196
197        tracing::info!(?buffer_size, "sending events");
198
199        let mut buffer = self.create_buffer(events, buffer_size)?;
200        while let Some(res) = buffer.next().await {
201            if let Err(e) = res {
202                return log_and_err!(reason = e, "failed to send events");
203            }
204        }
205
206        #[cfg(feature = "metrics")]
207        metrics::inc_kafka_send_buffered(start.elapsed());
208        Ok(())
209    }
210}
211
212fn handle_delivery_result(res: Result<OwnedDeliveryResult, futures_channel::oneshot::Canceled>) -> Result<()> {
213    match res {
214        Err(e) => log_and_err!(reason = e, "failed to publish kafka event"),
215        Ok(Err((e, _))) => log_and_err!(reason = e, "failed to publish kafka event"),
216        Ok(_) => Ok(()),
217    }
218}