1use anyhow::Result;
2use anyhow::anyhow;
3use clap::Parser;
4use clap::ValueEnum;
5use display_json::DebugAsJson;
6use futures::Stream;
7use futures::StreamExt;
8use rdkafka::ClientConfig;
9use rdkafka::message::Header;
10use rdkafka::message::OwnedHeaders;
11use rdkafka::producer::DeliveryFuture;
12use rdkafka::producer::FutureProducer;
13use rdkafka::producer::FutureRecord;
14use rdkafka::producer::future_producer::OwnedDeliveryResult;
15
16use crate::infra::metrics;
17use crate::ledger::events::Event;
18use crate::log_and_err;
19
20#[derive(Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize, Default)]
21#[group(requires_all = ["bootstrap_servers", "topic", "client_id", "ImporterConfig"])]
22pub struct KafkaConfig {
23 #[arg(long = "kafka-bootstrap-servers", env = "KAFKA_BOOTSTRAP_SERVERS", required = false)]
24 pub bootstrap_servers: String,
25
26 #[arg(long = "kafka-topic", env = "KAFKA_TOPIC", group = "kafka", required = false)]
27 pub topic: String,
28
29 #[arg(long = "kafka-client-id", env = "KAFKA_CLIENT_ID", required = false)]
30 pub client_id: String,
31
32 #[arg(long = "kafka-group-id", env = "KAFKA_GROUP_ID", required = false)]
33 pub group_id: Option<String>,
34
35 #[arg(long = "kafka-security-protocol", env = "KAFKA_SECURITY_PROTOCOL", required = false, default_value_t)]
36 pub security_protocol: KafkaSecurityProtocol,
37
38 #[arg(long = "kafka-sasl-mechanisms", env = "KAFKA_SASL_MECHANISMS", required = false)]
39 pub sasl_mechanisms: Option<String>,
40
41 #[arg(long = "kafka-sasl-username", env = "KAFKA_SASL_USERNAME", required = false)]
42 pub sasl_username: Option<String>,
43
44 #[arg(long = "kafka-sasl-password", env = "KAFKA_SASL_PASSWORD", required = false)]
45 pub sasl_password: Option<String>,
46
47 #[arg(long = "kafka-ssl-ca-location", env = "KAFKA_SSL_CA_LOCATION", required = false)]
48 pub ssl_ca_location: Option<String>,
49
50 #[arg(long = "kafka-ssl-certificate-location", env = "KAFKA_SSL_CERTIFICATE_LOCATION", required = false)]
51 pub ssl_certificate_location: Option<String>,
52
53 #[arg(long = "kafka-ssl-key-location", env = "KAFKA_SSL_KEY_LOCATION", required = false)]
54 pub ssl_key_location: Option<String>,
55}
56
57impl KafkaConfig {
58 pub fn init(&self) -> Result<KafkaConnector> {
59 KafkaConnector::new(self)
60 }
61}
62
63#[derive(Clone)]
64pub struct KafkaConnector {
65 producer: FutureProducer,
66 topic: String,
67}
68
69#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, ValueEnum, Default)]
70pub enum KafkaSecurityProtocol {
71 #[default]
72 None,
73 SaslSsl,
74 Ssl,
75}
76
77impl std::fmt::Display for KafkaSecurityProtocol {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 KafkaSecurityProtocol::None => write!(f, "none"),
81 KafkaSecurityProtocol::SaslSsl => write!(f, "sasl_ssl"),
82 KafkaSecurityProtocol::Ssl => write!(f, "ssl"),
83 }
84 }
85}
86
87impl KafkaConnector {
88 pub fn new(config: &KafkaConfig) -> Result<Self> {
89 tracing::info!(
90 topic = %config.topic,
91 bootstrap_servers = %config.bootstrap_servers,
92 client_id = %config.client_id,
93 "Creating Kafka connector"
94 );
95
96 let security_protocol = config.security_protocol;
97 let mut client_config = ClientConfig::new()
98 .set("bootstrap.servers", &config.bootstrap_servers)
99 .set("client.id", &config.client_id)
100 .set("linger.ms", "5")
101 .set("batch.size", "1048576") .to_owned();
103
104 let producer = match security_protocol {
105 KafkaSecurityProtocol::None => client_config.create()?,
106 KafkaSecurityProtocol::SaslSsl => client_config
107 .set("security.protocol", "SASL_SSL")
108 .set(
109 "sasl.mechanisms",
110 config.sasl_mechanisms.as_ref().ok_or(anyhow!("sasl mechanisms is required"))?.as_str(),
111 )
112 .set(
113 "sasl.username",
114 config.sasl_username.as_ref().ok_or(anyhow!("sasl username is required"))?.as_str(),
115 )
116 .set(
117 "sasl.password",
118 config.sasl_password.as_ref().ok_or(anyhow!("sasl password is required"))?.as_str(),
119 )
120 .create()?,
121 KafkaSecurityProtocol::Ssl => client_config
122 .set(
123 "ssl.ca.location",
124 config.ssl_ca_location.as_ref().ok_or(anyhow!("ssl ca location is required"))?.as_str(),
125 )
126 .set(
127 "ssl.certificate.location",
128 config
129 .ssl_certificate_location
130 .as_ref()
131 .ok_or(anyhow!("ssl certificate location is required"))?
132 .as_str(),
133 )
134 .set(
135 "ssl.key.location",
136 config.ssl_key_location.as_ref().ok_or(anyhow!("ssl key location is required"))?.as_str(),
137 )
138 .create()?,
139 };
140
141 Ok(Self {
142 producer,
143 topic: config.topic.clone(),
144 })
145 }
146
147 pub fn queue_event<T: Event>(&self, event: T) -> Result<DeliveryFuture> {
148 tracing::debug!(?event, "queueing event");
149
150 let headers = event.event_headers()?;
152 let key = event.event_key()?;
153 let payload = event.event_payload()?;
154
155 let mut kafka_headers = OwnedHeaders::new_with_capacity(headers.len());
157 for (key, value) in headers.iter() {
158 let header = Header { key, value: Some(value) };
159 kafka_headers = kafka_headers.insert(header);
160 }
161 let kafka_record = FutureRecord::to(&self.topic).payload(&payload).key(&key).headers(kafka_headers);
162
163 tracing::info!(%key, %payload, ?headers, "publishing kafka event");
165 match self.producer.send_result(kafka_record) {
166 Err((e, _)) => log_and_err!(reason = e, "failed to queue kafka event"),
167 Ok(fut) => Ok(fut),
168 }
169 }
170
171 pub async fn send_event<T: Event>(&self, event: T) -> Result<()> {
172 tracing::debug!(?event, "sending event");
173 handle_delivery_result(self.queue_event(event)?.await)
174 }
175
176 pub fn create_buffer<T, I>(&self, events: I, buffer_size: usize) -> Result<impl Stream<Item = Result<()>>>
177 where
178 T: Event,
179 I: IntoIterator<Item = T>,
180 {
181 #[cfg(feature = "metrics")]
182 let start = metrics::now();
183
184 let futures: Vec<DeliveryFuture> = events
185 .into_iter()
186 .map(|event| {
187 metrics::timed(|| self.queue_event(event)).with(|m| {
188 metrics::inc_kafka_queue_event(m.elapsed);
189 })
190 })
191 .collect::<Result<Vec<_>, _>>()?; #[cfg(feature = "metrics")]
194 metrics::inc_kafka_create_buffer(start.elapsed());
195
196 Ok(futures::stream::iter(futures).buffered(buffer_size).map(handle_delivery_result))
197 }
198
199 pub async fn send_buffered<T, I>(&self, events: I, buffer_size: usize) -> Result<()>
200 where
201 T: Event,
202 I: IntoIterator<Item = T>,
203 {
204 #[cfg(feature = "metrics")]
205 let start = metrics::now();
206
207 tracing::info!(?buffer_size, "sending events");
208
209 let mut buffer = self.create_buffer(events, buffer_size)?;
210 while let Some(res) = buffer.next().await {
211 if let Err(e) = res {
212 return log_and_err!(reason = e, "failed to send events");
213 }
214 }
215
216 #[cfg(feature = "metrics")]
217 metrics::inc_kafka_send_buffered(start.elapsed());
218 Ok(())
219 }
220}
221
222fn handle_delivery_result(res: Result<OwnedDeliveryResult, futures_channel::oneshot::Canceled>) -> Result<()> {
223 match res {
224 Err(e) => log_and_err!(reason = e, "failed to publish kafka event"),
225 Ok(Err((e, _))) => log_and_err!(reason = e, "failed to publish kafka event"),
226 Ok(_) => Ok(()),
227 }
228}