byteor_adapters/
kafka.rs

1use std::collections::VecDeque;
2use std::time::{Duration, Instant};
3
4use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
5use kafka::producer::{Producer, Record, RequiredAcks};
6
7use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
8
9/// Internal Kafka ingress client contract used by the adapter and crate-local tests.
10pub(crate) trait KafkaIngressClient: Send {
11    fn poll_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError>;
12}
13
14/// Internal Kafka egress client contract used by the adapter and crate-local tests.
15pub(crate) trait KafkaEgressClient: Send {
16    fn send_msg(&mut self, payload: &[u8]) -> Result<(), AdapterError>;
17    fn flush(&mut self, timeout: Duration) -> Result<(), AdapterError>;
18}
19
20struct KafkaConsumerClient {
21    brokers: String,
22    group_id: String,
23    topic: String,
24    consumer: Consumer,
25    pending: VecDeque<Vec<u8>>,
26}
27
28impl KafkaConsumerClient {
29    fn create_consumer(
30        brokers: &str,
31        group_id: &str,
32        topic: &str,
33    ) -> Result<Consumer, AdapterError> {
34        Consumer::from_hosts(vec![brokers.to_string()])
35            .with_topic(topic.to_string())
36            .with_group(group_id.to_string())
37            .with_fallback_offset(FetchOffset::Earliest)
38            .with_offset_storage(Some(GroupOffsetStorage::Kafka))
39            .create()
40            .map_err(|err| AdapterError::Kafka {
41                op: "kafka_consumer_create",
42                detail: err.to_string(),
43            })
44    }
45
46    fn connect(brokers: &str, group_id: &str, topic: &str) -> Result<Self, AdapterError> {
47        let consumer = Self::create_consumer(brokers, group_id, topic)?;
48        Ok(Self {
49            brokers: brokers.to_string(),
50            group_id: group_id.to_string(),
51            topic: topic.to_string(),
52            consumer,
53            pending: VecDeque::new(),
54        })
55    }
56
57    fn reconnect(&mut self) -> Result<(), AdapterError> {
58        self.consumer = Self::create_consumer(
59            self.brokers.as_str(),
60            self.group_id.as_str(),
61            self.topic.as_str(),
62        )?;
63        Ok(())
64    }
65}
66
67impl KafkaIngressClient for KafkaConsumerClient {
68    fn poll_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError> {
69        if let Some(payload) = self.pending.pop_front() {
70            return Ok(Some(payload));
71        }
72
73        let deadline = Instant::now() + timeout;
74        loop {
75            match self.consumer.poll() {
76                Ok(message_sets) => {
77                    if message_sets.is_empty() {
78                        if Instant::now() >= deadline {
79                            return Ok(None);
80                        }
81                        std::thread::sleep(Duration::from_millis(50));
82                        continue;
83                    }
84
85                    for message_set in message_sets.iter() {
86                        for message in message_set.messages() {
87                            self.pending.push_back(message.value.to_vec());
88                        }
89                        if let Some(last_message) = message_set.messages().last() {
90                            self.consumer
91                                .consume_message(
92                                    message_set.topic(),
93                                    message_set.partition(),
94                                    last_message.offset,
95                                )
96                                .map_err(|err| AdapterError::Kafka {
97                                    op: "kafka_consume_message",
98                                    detail: err.to_string(),
99                                })?;
100                        }
101                    }
102                    self.consumer
103                        .commit_consumed()
104                        .map_err(|err| AdapterError::Kafka {
105                            op: "kafka_commit_consumed",
106                            detail: err.to_string(),
107                        })?;
108
109                    return self.pending.pop_front().map(Some).ok_or_else(|| AdapterError::Kafka {
110                        op: "kafka_poll",
111                        detail: "poll returned a message set without payloads".into(),
112                    });
113                }
114                Err(err) => {
115                    let poll_error = err.to_string();
116                    if Instant::now() >= deadline {
117                        return Err(AdapterError::Kafka {
118                            op: "kafka_poll",
119                            detail: poll_error,
120                        });
121                    }
122                    self.reconnect().map_err(|reconnect_error| AdapterError::Kafka {
123                        op: "kafka_poll",
124                        detail: format!(
125                            "{}; reconnect failed: {}",
126                            poll_error,
127                            reconnect_error
128                        ),
129                    })?;
130                    std::thread::sleep(Duration::from_millis(50));
131                }
132            }
133        }
134    }
135}
136
137struct KafkaProducerClient {
138    producer: Producer,
139    topic: String,
140}
141
142impl KafkaProducerClient {
143    fn connect(brokers: &str, topic: impl Into<String>) -> Result<Self, AdapterError> {
144        let producer = Producer::from_hosts(vec![brokers.to_string()])
145            .with_required_acks(RequiredAcks::One)
146            .with_ack_timeout(Duration::from_secs(5))
147            .create()
148            .map_err(|err| AdapterError::Kafka {
149                op: "kafka_producer_create",
150                detail: err.to_string(),
151            })?;
152        Ok(Self {
153            producer,
154            topic: topic.into(),
155        })
156    }
157}
158
159impl KafkaEgressClient for KafkaProducerClient {
160    fn send_msg(&mut self, payload: &[u8]) -> Result<(), AdapterError> {
161        self.producer
162            .send(&Record::from_value(&self.topic, payload))
163            .map_err(|err| AdapterError::Kafka {
164                op: "kafka_send",
165                detail: err.to_string(),
166            })
167    }
168
169    fn flush(&mut self, _timeout: Duration) -> Result<(), AdapterError> {
170        Ok(())
171    }
172}
173
174/// Minimal blocking Kafka ingress adapter.
175///
176/// This adapter subscribes to one Kafka topic and exposes each consumed record payload as one
177/// ingress message.
178pub struct KafkaIngress {
179    name: String,
180    client: Box<dyn KafkaIngressClient>,
181    poll_timeout: Duration,
182    max_message_bytes: usize,
183}
184
185impl KafkaIngress {
186    /// Create a blocking Kafka ingress adapter.
187    pub fn new(
188        name: impl Into<String>,
189        brokers: &str,
190        group_id: &str,
191        topic: &str,
192        poll_timeout: Duration,
193        max_message_bytes: usize,
194    ) -> Result<Self, AdapterError> {
195        let client = KafkaConsumerClient::connect(brokers, group_id, topic)?;
196        Ok(Self {
197            name: name.into(),
198            client: Box::new(client),
199            poll_timeout,
200            max_message_bytes,
201        })
202    }
203
204    #[cfg(test)]
205    /// Build a Kafka ingress adapter around a crate-local fake client.
206    pub(crate) fn with_client(
207        name: impl Into<String>,
208        client: impl KafkaIngressClient + 'static,
209        poll_timeout: Duration,
210        max_message_bytes: usize,
211    ) -> Self {
212        Self {
213            name: name.into(),
214            client: Box::new(client),
215            poll_timeout,
216            max_message_bytes,
217        }
218    }
219}
220
221impl Adapter for KafkaIngress {
222    fn name(&self) -> &str {
223        &self.name
224    }
225
226    fn transport_kind(&self) -> &str {
227        "kafka"
228    }
229}
230
231impl IngressAdapter for KafkaIngress {
232    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
233        let Some(payload) = self.client.poll_next(self.poll_timeout)? else {
234            return Ok(None);
235        };
236        if payload.len() > self.max_message_bytes || payload.len() > out.len() {
237            return Err(AdapterError::Kafka {
238                op: "kafka_read_next",
239                detail: format!(
240                    "payload too large len={} max={} buf={}",
241                    payload.len(),
242                    self.max_message_bytes,
243                    out.len()
244                ),
245            });
246        }
247        out[..payload.len()].copy_from_slice(&payload);
248        Ok(Some(payload.len()))
249    }
250}
251
252/// Minimal blocking Kafka egress adapter.
253///
254/// Each `write_msg()` produces one Kafka record to the configured topic.
255pub struct KafkaEgress {
256    name: String,
257    client: Box<dyn KafkaEgressClient>,
258    flush_timeout: Duration,
259}
260
261impl KafkaEgress {
262    /// Create a blocking Kafka egress adapter.
263    pub fn new(
264        name: impl Into<String>,
265        brokers: &str,
266        topic: &str,
267        flush_timeout: Duration,
268    ) -> Result<Self, AdapterError> {
269        let client = KafkaProducerClient::connect(brokers, topic)?;
270        Ok(Self {
271            name: name.into(),
272            client: Box::new(client),
273            flush_timeout,
274        })
275    }
276
277    #[cfg(test)]
278    /// Build a Kafka egress adapter around a crate-local fake client.
279    pub(crate) fn with_client(
280        name: impl Into<String>,
281        client: impl KafkaEgressClient + 'static,
282        flush_timeout: Duration,
283    ) -> Self {
284        Self {
285            name: name.into(),
286            client: Box::new(client),
287            flush_timeout,
288        }
289    }
290}
291
292impl Adapter for KafkaEgress {
293    fn name(&self) -> &str {
294        &self.name
295    }
296
297    fn transport_kind(&self) -> &str {
298        "kafka"
299    }
300}
301
302impl EgressAdapter for KafkaEgress {
303    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
304        self.client.send_msg(msg)
305    }
306
307    fn flush(&mut self) -> Result<(), AdapterError> {
308        self.client.flush(self.flush_timeout)
309    }
310}