stratus/infra/kafka/
kafka.rs

1use anyhow::Result;
2use anyhow::anyhow;
3use clap::Parser;
4use clap::ValueEnum;
5use display_json::DebugAsJson;
6use futures::Stream;
7use futures::StreamExt;
8use rdkafka::ClientConfig;
9use rdkafka::message::Header;
10use rdkafka::message::OwnedHeaders;
11use rdkafka::producer::DeliveryFuture;
12use rdkafka::producer::FutureProducer;
13use rdkafka::producer::FutureRecord;
14use rdkafka::producer::future_producer::OwnedDeliveryResult;
15
16use crate::infra::metrics;
17use crate::ledger::events::Event;
18use crate::log_and_err;
19
20#[derive(Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize, Default)]
21#[group(requires_all = ["bootstrap_servers", "topic", "client_id", "ImporterConfig"])]
22pub struct KafkaConfig {
23    #[arg(long = "kafka-bootstrap-servers", env = "KAFKA_BOOTSTRAP_SERVERS", required = false)]
24    pub bootstrap_servers: String,
25
26    #[arg(long = "kafka-topic", env = "KAFKA_TOPIC", group = "kafka", required = false)]
27    pub topic: String,
28
29    #[arg(long = "kafka-client-id", env = "KAFKA_CLIENT_ID", required = false)]
30    pub client_id: String,
31
32    #[arg(long = "kafka-group-id", env = "KAFKA_GROUP_ID", required = false)]
33    pub group_id: Option<String>,
34
35    #[arg(long = "kafka-security-protocol", env = "KAFKA_SECURITY_PROTOCOL", required = false, default_value_t)]
36    pub security_protocol: KafkaSecurityProtocol,
37
38    #[arg(long = "kafka-sasl-mechanisms", env = "KAFKA_SASL_MECHANISMS", required = false)]
39    pub sasl_mechanisms: Option<String>,
40
41    #[arg(long = "kafka-sasl-username", env = "KAFKA_SASL_USERNAME", required = false)]
42    pub sasl_username: Option<String>,
43
44    #[arg(long = "kafka-sasl-password", env = "KAFKA_SASL_PASSWORD", required = false)]
45    pub sasl_password: Option<String>,
46
47    #[arg(long = "kafka-ssl-ca-location", env = "KAFKA_SSL_CA_LOCATION", required = false)]
48    pub ssl_ca_location: Option<String>,
49
50    #[arg(long = "kafka-ssl-certificate-location", env = "KAFKA_SSL_CERTIFICATE_LOCATION", required = false)]
51    pub ssl_certificate_location: Option<String>,
52
53    #[arg(long = "kafka-ssl-key-location", env = "KAFKA_SSL_KEY_LOCATION", required = false)]
54    pub ssl_key_location: Option<String>,
55}
56
57impl KafkaConfig {
58    pub fn init(&self) -> Result<KafkaConnector> {
59        KafkaConnector::new(self)
60    }
61}
62
63#[derive(Clone)]
64pub struct KafkaConnector {
65    producer: FutureProducer,
66    topic: String,
67}
68
69#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, ValueEnum, Default)]
70pub enum KafkaSecurityProtocol {
71    #[default]
72    None,
73    SaslSsl,
74    Ssl,
75}
76
77impl std::fmt::Display for KafkaSecurityProtocol {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            KafkaSecurityProtocol::None => write!(f, "none"),
81            KafkaSecurityProtocol::SaslSsl => write!(f, "sasl_ssl"),
82            KafkaSecurityProtocol::Ssl => write!(f, "ssl"),
83        }
84    }
85}
86
87impl KafkaConnector {
88    pub fn new(config: &KafkaConfig) -> Result<Self> {
89        tracing::info!(
90            topic = %config.topic,
91            bootstrap_servers = %config.bootstrap_servers,
92            client_id = %config.client_id,
93            "Creating Kafka connector"
94        );
95
96        let security_protocol = config.security_protocol;
97        let mut client_config = ClientConfig::new()
98            .set("bootstrap.servers", &config.bootstrap_servers)
99            .set("client.id", &config.client_id)
100            .set("linger.ms", "5")
101            .set("batch.size", "1048576") // 1 MB
102            .to_owned();
103
104        let producer = match security_protocol {
105            KafkaSecurityProtocol::None => client_config.create()?,
106            KafkaSecurityProtocol::SaslSsl => client_config
107                .set("security.protocol", "SASL_SSL")
108                .set(
109                    "sasl.mechanisms",
110                    config.sasl_mechanisms.as_ref().ok_or(anyhow!("sasl mechanisms is required"))?.as_str(),
111                )
112                .set(
113                    "sasl.username",
114                    config.sasl_username.as_ref().ok_or(anyhow!("sasl username is required"))?.as_str(),
115                )
116                .set(
117                    "sasl.password",
118                    config.sasl_password.as_ref().ok_or(anyhow!("sasl password is required"))?.as_str(),
119                )
120                .create()?,
121            KafkaSecurityProtocol::Ssl => client_config
122                .set(
123                    "ssl.ca.location",
124                    config.ssl_ca_location.as_ref().ok_or(anyhow!("ssl ca location is required"))?.as_str(),
125                )
126                .set(
127                    "ssl.certificate.location",
128                    config
129                        .ssl_certificate_location
130                        .as_ref()
131                        .ok_or(anyhow!("ssl certificate location is required"))?
132                        .as_str(),
133                )
134                .set(
135                    "ssl.key.location",
136                    config.ssl_key_location.as_ref().ok_or(anyhow!("ssl key location is required"))?.as_str(),
137                )
138                .create()?,
139        };
140
141        Ok(Self {
142            producer,
143            topic: config.topic.clone(),
144        })
145    }
146
147    pub fn queue_event<T: Event>(&self, event: T) -> Result<DeliveryFuture> {
148        tracing::debug!(?event, "queueing event");
149
150        // prepare base payload
151        let headers = event.event_headers()?;
152        let key = event.event_key()?;
153        let payload = event.event_payload()?;
154
155        // prepare kafka payload
156        let mut kafka_headers = OwnedHeaders::new_with_capacity(headers.len());
157        for (key, value) in headers.iter() {
158            let header = Header { key, value: Some(value) };
159            kafka_headers = kafka_headers.insert(header);
160        }
161        let kafka_record = FutureRecord::to(&self.topic).payload(&payload).key(&key).headers(kafka_headers);
162
163        // publis and handle response
164        tracing::info!(%key, %payload, ?headers, "publishing kafka event");
165        match self.producer.send_result(kafka_record) {
166            Err((e, _)) => log_and_err!(reason = e, "failed to queue kafka event"),
167            Ok(fut) => Ok(fut),
168        }
169    }
170
171    pub async fn send_event<T: Event>(&self, event: T) -> Result<()> {
172        tracing::debug!(?event, "sending event");
173        handle_delivery_result(self.queue_event(event)?.await)
174    }
175
176    pub fn create_buffer<T, I>(&self, events: I, buffer_size: usize) -> Result<impl Stream<Item = Result<()>>>
177    where
178        T: Event,
179        I: IntoIterator<Item = T>,
180    {
181        #[cfg(feature = "metrics")]
182        let start = metrics::now();
183
184        let futures: Vec<DeliveryFuture> = events
185            .into_iter()
186            .map(|event| {
187                metrics::timed(|| self.queue_event(event)).with(|m| {
188                    metrics::inc_kafka_queue_event(m.elapsed);
189                })
190            })
191            .collect::<Result<Vec<_>, _>>()?; // This could fail because the queue is full (?)
192
193        #[cfg(feature = "metrics")]
194        metrics::inc_kafka_create_buffer(start.elapsed());
195
196        Ok(futures::stream::iter(futures).buffered(buffer_size).map(handle_delivery_result))
197    }
198
199    pub async fn send_buffered<T, I>(&self, events: I, buffer_size: usize) -> Result<()>
200    where
201        T: Event,
202        I: IntoIterator<Item = T>,
203    {
204        #[cfg(feature = "metrics")]
205        let start = metrics::now();
206
207        tracing::info!(?buffer_size, "sending events");
208
209        let mut buffer = self.create_buffer(events, buffer_size)?;
210        while let Some(res) = buffer.next().await {
211            if let Err(e) = res {
212                return log_and_err!(reason = e, "failed to send events");
213            }
214        }
215
216        #[cfg(feature = "metrics")]
217        metrics::inc_kafka_send_buffered(start.elapsed());
218        Ok(())
219    }
220}
221
222fn handle_delivery_result(res: Result<OwnedDeliveryResult, futures_channel::oneshot::Canceled>) -> Result<()> {
223    match res {
224        Err(e) => log_and_err!(reason = e, "failed to publish kafka event"),
225        Ok(Err((e, _))) => log_and_err!(reason = e, "failed to publish kafka event"),
226        Ok(_) => Ok(()),
227    }
228}