1use std::collections::VecDeque;
2use std::time::{Duration, Instant};
3
4use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
5use kafka::producer::{Producer, Record, RequiredAcks};
6
7use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
8
9pub(crate) trait KafkaIngressClient: Send {
11 fn poll_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError>;
12}
13
14pub(crate) trait KafkaEgressClient: Send {
16 fn send_msg(&mut self, payload: &[u8]) -> Result<(), AdapterError>;
17 fn flush(&mut self, timeout: Duration) -> Result<(), AdapterError>;
18}
19
20struct KafkaConsumerClient {
21 brokers: String,
22 group_id: String,
23 topic: String,
24 consumer: Consumer,
25 pending: VecDeque<Vec<u8>>,
26}
27
28impl KafkaConsumerClient {
29 fn create_consumer(
30 brokers: &str,
31 group_id: &str,
32 topic: &str,
33 ) -> Result<Consumer, AdapterError> {
34 Consumer::from_hosts(vec![brokers.to_string()])
35 .with_topic(topic.to_string())
36 .with_group(group_id.to_string())
37 .with_fallback_offset(FetchOffset::Earliest)
38 .with_offset_storage(Some(GroupOffsetStorage::Kafka))
39 .create()
40 .map_err(|err| AdapterError::Kafka {
41 op: "kafka_consumer_create",
42 detail: err.to_string(),
43 })
44 }
45
46 fn connect(brokers: &str, group_id: &str, topic: &str) -> Result<Self, AdapterError> {
47 let consumer = Self::create_consumer(brokers, group_id, topic)?;
48 Ok(Self {
49 brokers: brokers.to_string(),
50 group_id: group_id.to_string(),
51 topic: topic.to_string(),
52 consumer,
53 pending: VecDeque::new(),
54 })
55 }
56
57 fn reconnect(&mut self) -> Result<(), AdapterError> {
58 self.consumer = Self::create_consumer(
59 self.brokers.as_str(),
60 self.group_id.as_str(),
61 self.topic.as_str(),
62 )?;
63 Ok(())
64 }
65}
66
67impl KafkaIngressClient for KafkaConsumerClient {
68 fn poll_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError> {
69 if let Some(payload) = self.pending.pop_front() {
70 return Ok(Some(payload));
71 }
72
73 let deadline = Instant::now() + timeout;
74 loop {
75 match self.consumer.poll() {
76 Ok(message_sets) => {
77 if message_sets.is_empty() {
78 if Instant::now() >= deadline {
79 return Ok(None);
80 }
81 std::thread::sleep(Duration::from_millis(50));
82 continue;
83 }
84
85 for message_set in message_sets.iter() {
86 for message in message_set.messages() {
87 self.pending.push_back(message.value.to_vec());
88 }
89 if let Some(last_message) = message_set.messages().last() {
90 self.consumer
91 .consume_message(
92 message_set.topic(),
93 message_set.partition(),
94 last_message.offset,
95 )
96 .map_err(|err| AdapterError::Kafka {
97 op: "kafka_consume_message",
98 detail: err.to_string(),
99 })?;
100 }
101 }
102 self.consumer
103 .commit_consumed()
104 .map_err(|err| AdapterError::Kafka {
105 op: "kafka_commit_consumed",
106 detail: err.to_string(),
107 })?;
108
109 return self.pending.pop_front().map(Some).ok_or_else(|| AdapterError::Kafka {
110 op: "kafka_poll",
111 detail: "poll returned a message set without payloads".into(),
112 });
113 }
114 Err(err) => {
115 let poll_error = err.to_string();
116 if Instant::now() >= deadline {
117 return Err(AdapterError::Kafka {
118 op: "kafka_poll",
119 detail: poll_error,
120 });
121 }
122 self.reconnect().map_err(|reconnect_error| AdapterError::Kafka {
123 op: "kafka_poll",
124 detail: format!(
125 "{}; reconnect failed: {}",
126 poll_error,
127 reconnect_error
128 ),
129 })?;
130 std::thread::sleep(Duration::from_millis(50));
131 }
132 }
133 }
134 }
135}
136
137struct KafkaProducerClient {
138 producer: Producer,
139 topic: String,
140}
141
142impl KafkaProducerClient {
143 fn connect(brokers: &str, topic: impl Into<String>) -> Result<Self, AdapterError> {
144 let producer = Producer::from_hosts(vec![brokers.to_string()])
145 .with_required_acks(RequiredAcks::One)
146 .with_ack_timeout(Duration::from_secs(5))
147 .create()
148 .map_err(|err| AdapterError::Kafka {
149 op: "kafka_producer_create",
150 detail: err.to_string(),
151 })?;
152 Ok(Self {
153 producer,
154 topic: topic.into(),
155 })
156 }
157}
158
159impl KafkaEgressClient for KafkaProducerClient {
160 fn send_msg(&mut self, payload: &[u8]) -> Result<(), AdapterError> {
161 self.producer
162 .send(&Record::from_value(&self.topic, payload))
163 .map_err(|err| AdapterError::Kafka {
164 op: "kafka_send",
165 detail: err.to_string(),
166 })
167 }
168
169 fn flush(&mut self, _timeout: Duration) -> Result<(), AdapterError> {
170 Ok(())
171 }
172}
173
174pub struct KafkaIngress {
179 name: String,
180 client: Box<dyn KafkaIngressClient>,
181 poll_timeout: Duration,
182 max_message_bytes: usize,
183}
184
185impl KafkaIngress {
186 pub fn new(
188 name: impl Into<String>,
189 brokers: &str,
190 group_id: &str,
191 topic: &str,
192 poll_timeout: Duration,
193 max_message_bytes: usize,
194 ) -> Result<Self, AdapterError> {
195 let client = KafkaConsumerClient::connect(brokers, group_id, topic)?;
196 Ok(Self {
197 name: name.into(),
198 client: Box::new(client),
199 poll_timeout,
200 max_message_bytes,
201 })
202 }
203
204 #[cfg(test)]
205 pub(crate) fn with_client(
207 name: impl Into<String>,
208 client: impl KafkaIngressClient + 'static,
209 poll_timeout: Duration,
210 max_message_bytes: usize,
211 ) -> Self {
212 Self {
213 name: name.into(),
214 client: Box::new(client),
215 poll_timeout,
216 max_message_bytes,
217 }
218 }
219}
220
221impl Adapter for KafkaIngress {
222 fn name(&self) -> &str {
223 &self.name
224 }
225
226 fn transport_kind(&self) -> &str {
227 "kafka"
228 }
229}
230
231impl IngressAdapter for KafkaIngress {
232 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
233 let Some(payload) = self.client.poll_next(self.poll_timeout)? else {
234 return Ok(None);
235 };
236 if payload.len() > self.max_message_bytes || payload.len() > out.len() {
237 return Err(AdapterError::Kafka {
238 op: "kafka_read_next",
239 detail: format!(
240 "payload too large len={} max={} buf={}",
241 payload.len(),
242 self.max_message_bytes,
243 out.len()
244 ),
245 });
246 }
247 out[..payload.len()].copy_from_slice(&payload);
248 Ok(Some(payload.len()))
249 }
250}
251
252pub struct KafkaEgress {
256 name: String,
257 client: Box<dyn KafkaEgressClient>,
258 flush_timeout: Duration,
259}
260
261impl KafkaEgress {
262 pub fn new(
264 name: impl Into<String>,
265 brokers: &str,
266 topic: &str,
267 flush_timeout: Duration,
268 ) -> Result<Self, AdapterError> {
269 let client = KafkaProducerClient::connect(brokers, topic)?;
270 Ok(Self {
271 name: name.into(),
272 client: Box::new(client),
273 flush_timeout,
274 })
275 }
276
277 #[cfg(test)]
278 pub(crate) fn with_client(
280 name: impl Into<String>,
281 client: impl KafkaEgressClient + 'static,
282 flush_timeout: Duration,
283 ) -> Self {
284 Self {
285 name: name.into(),
286 client: Box::new(client),
287 flush_timeout,
288 }
289 }
290}
291
292impl Adapter for KafkaEgress {
293 fn name(&self) -> &str {
294 &self.name
295 }
296
297 fn transport_kind(&self) -> &str {
298 "kafka"
299 }
300}
301
302impl EgressAdapter for KafkaEgress {
303 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
304 self.client.send_msg(msg)
305 }
306
307 fn flush(&mut self) -> Result<(), AdapterError> {
308 self.client.flush(self.flush_timeout)
309 }
310}