1use anyhow::Result;
2use clap::Parser;
3use clap::ValueEnum;
4use display_json::DebugAsJson;
5use futures::Stream;
6use futures::StreamExt;
7use rdkafka::ClientConfig;
8use rdkafka::message::Header;
9use rdkafka::message::OwnedHeaders;
10use rdkafka::producer::DeliveryFuture;
11use rdkafka::producer::FutureProducer;
12use rdkafka::producer::FutureRecord;
13use rdkafka::producer::future_producer::OwnedDeliveryResult;
14
15use crate::infra::metrics;
16use crate::ledger::events::Event;
17use crate::log_and_err;
18
19#[derive(Parser, DebugAsJson, Clone, serde::Serialize, serde::Deserialize, Default)]
20#[group(requires_all = ["bootstrap_servers", "topic", "client_id", "ImporterConfig"])]
21pub struct KafkaConfig {
22 #[arg(long = "kafka-bootstrap-servers", env = "KAFKA_BOOTSTRAP_SERVERS", required = false)]
23 pub bootstrap_servers: String,
24
25 #[arg(long = "kafka-topic", env = "KAFKA_TOPIC", group = "kafka", required = false)]
26 pub topic: String,
27
28 #[arg(long = "kafka-client-id", env = "KAFKA_CLIENT_ID", required = false)]
29 pub client_id: String,
30
31 #[arg(long = "kafka-group-id", env = "KAFKA_GROUP_ID", required = false)]
32 pub group_id: Option<String>,
33
34 #[arg(long = "kafka-security-protocol", env = "KAFKA_SECURITY_PROTOCOL", required = false, default_value_t)]
35 pub security_protocol: KafkaSecurityProtocol,
36
37 #[arg(long = "kafka-sasl-mechanisms", env = "KAFKA_SASL_MECHANISMS", required = false)]
38 pub sasl_mechanisms: Option<String>,
39
40 #[arg(long = "kafka-sasl-username", env = "KAFKA_SASL_USERNAME", required = false)]
41 pub sasl_username: Option<String>,
42
43 #[arg(long = "kafka-sasl-password", env = "KAFKA_SASL_PASSWORD", required = false)]
44 pub sasl_password: Option<String>,
45
46 #[arg(long = "kafka-ssl-ca-location", env = "KAFKA_SSL_CA_LOCATION", required = false)]
47 pub ssl_ca_location: Option<String>,
48
49 #[arg(long = "kafka-ssl-certificate-location", env = "KAFKA_SSL_CERTIFICATE_LOCATION", required = false)]
50 pub ssl_certificate_location: Option<String>,
51
52 #[arg(long = "kafka-ssl-key-location", env = "KAFKA_SSL_KEY_LOCATION", required = false)]
53 pub ssl_key_location: Option<String>,
54}
55
56impl KafkaConfig {
57 pub fn init(&self) -> Result<KafkaConnector> {
58 KafkaConnector::new(self)
59 }
60}
61
62#[derive(Clone)]
63pub struct KafkaConnector {
64 producer: FutureProducer,
65 topic: String,
66}
67
68#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, ValueEnum, Default)]
69pub enum KafkaSecurityProtocol {
70 #[default]
71 None,
72 SaslSsl,
73 Ssl,
74}
75
76impl std::fmt::Display for KafkaSecurityProtocol {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 match self {
79 KafkaSecurityProtocol::None => write!(f, "none"),
80 KafkaSecurityProtocol::SaslSsl => write!(f, "sasl_ssl"),
81 KafkaSecurityProtocol::Ssl => write!(f, "ssl"),
82 }
83 }
84}
85
86impl KafkaConnector {
87 #[allow(clippy::expect_used)]
88 pub fn new(config: &KafkaConfig) -> Result<Self> {
89 tracing::info!(
90 topic = %config.topic,
91 bootstrap_servers = %config.bootstrap_servers,
92 client_id = %config.client_id,
93 "Creating Kafka connector"
94 );
95
96 let security_protocol = config.security_protocol;
97 let mut client_config = ClientConfig::new()
98 .set("bootstrap.servers", &config.bootstrap_servers)
99 .set("client.id", &config.client_id)
100 .set("linger.ms", "5")
101 .set("batch.size", "1048576") .to_owned();
103
104 let producer = match security_protocol {
105 KafkaSecurityProtocol::None => client_config.create()?,
106 KafkaSecurityProtocol::SaslSsl => client_config
107 .set("security.protocol", "SASL_SSL")
108 .set(
109 "sasl.mechanisms",
110 config.sasl_mechanisms.as_ref().expect("sasl mechanisms is required").as_str(),
111 )
112 .set("sasl.username", config.sasl_username.as_ref().expect("sasl username is required").as_str())
113 .set("sasl.password", config.sasl_password.as_ref().expect("sasl password is required").as_str())
114 .create()?,
115 KafkaSecurityProtocol::Ssl => client_config
116 .set(
117 "ssl.ca.location",
118 config.ssl_ca_location.as_ref().expect("ssl ca location is required").as_str(),
119 )
120 .set(
121 "ssl.certificate.location",
122 config.ssl_certificate_location.as_ref().expect("ssl certificate location is required").as_str(),
123 )
124 .set(
125 "ssl.key.location",
126 config.ssl_key_location.as_ref().expect("ssl key location is required").as_str(),
127 )
128 .create()?,
129 };
130
131 Ok(Self {
132 producer,
133 topic: config.topic.clone(),
134 })
135 }
136
137 pub fn queue_event<T: Event>(&self, event: T) -> Result<DeliveryFuture> {
138 tracing::debug!(?event, "queueing event");
139
140 let headers = event.event_headers()?;
142 let key = event.event_key()?;
143 let payload = event.event_payload()?;
144
145 let mut kafka_headers = OwnedHeaders::new_with_capacity(headers.len());
147 for (key, value) in headers.iter() {
148 let header = Header { key, value: Some(value) };
149 kafka_headers = kafka_headers.insert(header);
150 }
151 let kafka_record = FutureRecord::to(&self.topic).payload(&payload).key(&key).headers(kafka_headers);
152
153 tracing::info!(%key, %payload, ?headers, "publishing kafka event");
155 match self.producer.send_result(kafka_record) {
156 Err((e, _)) => log_and_err!(reason = e, "failed to queue kafka event"),
157 Ok(fut) => Ok(fut),
158 }
159 }
160
161 pub async fn send_event<T: Event>(&self, event: T) -> Result<()> {
162 tracing::debug!(?event, "sending event");
163 handle_delivery_result(self.queue_event(event)?.await)
164 }
165
166 pub fn create_buffer<T, I>(&self, events: I, buffer_size: usize) -> Result<impl Stream<Item = Result<()>>>
167 where
168 T: Event,
169 I: IntoIterator<Item = T>,
170 {
171 #[cfg(feature = "metrics")]
172 let start = metrics::now();
173
174 let futures: Vec<DeliveryFuture> = events
175 .into_iter()
176 .map(|event| {
177 metrics::timed(|| self.queue_event(event)).with(|m| {
178 metrics::inc_kafka_queue_event(m.elapsed);
179 })
180 })
181 .collect::<Result<Vec<_>, _>>()?; #[cfg(feature = "metrics")]
184 metrics::inc_kafka_create_buffer(start.elapsed());
185
186 Ok(futures::stream::iter(futures).buffered(buffer_size).map(handle_delivery_result))
187 }
188
189 pub async fn send_buffered<T, I>(&self, events: I, buffer_size: usize) -> Result<()>
190 where
191 T: Event,
192 I: IntoIterator<Item = T>,
193 {
194 #[cfg(feature = "metrics")]
195 let start = metrics::now();
196
197 tracing::info!(?buffer_size, "sending events");
198
199 let mut buffer = self.create_buffer(events, buffer_size)?;
200 while let Some(res) = buffer.next().await {
201 if let Err(e) = res {
202 return log_and_err!(reason = e, "failed to send events");
203 }
204 }
205
206 #[cfg(feature = "metrics")]
207 metrics::inc_kafka_send_buffered(start.elapsed());
208 Ok(())
209 }
210}
211
212fn handle_delivery_result(res: Result<OwnedDeliveryResult, futures_channel::oneshot::Canceled>) -> Result<()> {
213 match res {
214 Err(e) => log_and_err!(reason = e, "failed to publish kafka event"),
215 Ok(Err((e, _))) => log_and_err!(reason = e, "failed to publish kafka event"),
216 Ok(_) => Ok(()),
217 }
218}