1use std::mem;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use futures_util::StreamExt;
6use rabbitmq_stream_client::types::{Message, OffsetSpecification};
7use rabbitmq_stream_client::{Environment, NoDedup, Producer};
8use tokio::runtime::Runtime;
9
10use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
11
12pub(crate) struct RabbitMqStreamsIngressMessage {
13 pub payload: Vec<u8>,
14 pub offset: u64,
15}
16
17#[derive(Debug)]
18struct RabbitMqStreamsIngressStatus {
19 stream: String,
20 consumer_name: Option<String>,
21 offset_specification: String,
22 last_delivery_offset: Option<u64>,
23}
24
25impl RabbitMqStreamsIngressStatus {
26 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
27 let mut details = serde_json::Map::new();
28 details.insert(
29 "stream".to_string(),
30 serde_json::Value::String(self.stream.clone()),
31 );
32 details.insert(
33 "offset_specification".to_string(),
34 serde_json::Value::String(self.offset_specification.clone()),
35 );
36 if let Some(consumer_name) = &self.consumer_name {
37 details.insert(
38 "consumer_name".to_string(),
39 serde_json::Value::String(consumer_name.clone()),
40 );
41 }
42 if let Some(offset) = self.last_delivery_offset {
43 details.insert(
44 "consumer_offset".to_string(),
45 serde_json::Value::from(offset),
46 );
47 }
48 details
49 }
50}
51
52#[derive(Debug)]
53struct RabbitMqStreamsEgressStatus {
54 stream: String,
55 publisher_name: Option<String>,
56 last_confirmed_publishing_id: Option<u64>,
57 pending_messages: u64,
58 last_flush_count: u64,
59}
60
61impl RabbitMqStreamsEgressStatus {
62 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
63 let mut details = serde_json::Map::new();
64 details.insert(
65 "stream".to_string(),
66 serde_json::Value::String(self.stream.clone()),
67 );
68 details.insert(
69 "publish_state".to_string(),
70 serde_json::Value::String(
71 if self.pending_messages > 0 {
72 "buffering"
73 } else if self.last_confirmed_publishing_id.is_some() {
74 "confirmed"
75 } else {
76 "idle"
77 }
78 .to_string(),
79 ),
80 );
81 details.insert(
82 "pending_messages".to_string(),
83 serde_json::Value::from(self.pending_messages),
84 );
85 details.insert(
86 "last_flush_count".to_string(),
87 serde_json::Value::from(self.last_flush_count),
88 );
89 if let Some(publisher_name) = &self.publisher_name {
90 details.insert(
91 "publisher_name".to_string(),
92 serde_json::Value::String(publisher_name.clone()),
93 );
94 }
95 if let Some(publishing_id) = self.last_confirmed_publishing_id {
96 details.insert(
97 "last_confirmed_publishing_id".to_string(),
98 serde_json::Value::from(publishing_id),
99 );
100 }
101 details
102 }
103}
104
105fn format_offset_specification(offset: &OffsetSpecification) -> String {
106 match offset {
107 OffsetSpecification::First => "first".to_string(),
108 OffsetSpecification::Last => "last".to_string(),
109 OffsetSpecification::Next => "next".to_string(),
110 OffsetSpecification::Offset(value) => format!("offset:{value}"),
111 OffsetSpecification::Timestamp(value) => format!("timestamp:{value}"),
112 }
113}
114
115pub(crate) trait RabbitMqStreamsIngressClient: Send {
117 fn read_next(
118 &mut self,
119 timeout: Duration,
120 ) -> Result<Option<RabbitMqStreamsIngressMessage>, AdapterError>;
121}
122
123pub(crate) trait RabbitMqStreamsEgressClient: Send {
125 fn publish(&mut self, payload: &[u8], timeout: Duration) -> Result<u64, AdapterError>;
126}
127
128struct RabbitMqStreamsConnectionConfig {
129 host: String,
130 port: u16,
131 username: Option<String>,
132 password: Option<String>,
133 virtual_host: String,
134}
135
136impl RabbitMqStreamsConnectionConfig {
137 fn parse(server_url: &str) -> Result<Self, AdapterError> {
138 let (scheme, remainder) =
139 server_url
140 .split_once("://")
141 .ok_or_else(|| AdapterError::Config {
142 detail: format!(
143 "rabbitmq streams server_url must include a scheme: {server_url}"
144 ),
145 })?;
146
147 if matches!(scheme, "amqps" | "rabbitmqs") {
148 return Err(AdapterError::Config {
149 detail: format!(
150 "rabbitmq streams TLS server URLs are not yet supported: {server_url}"
151 ),
152 });
153 }
154 if !matches!(scheme, "amqp" | "rabbitmq") {
155 return Err(AdapterError::Config {
156 detail: format!("unsupported rabbitmq streams server_url scheme: {scheme}"),
157 });
158 }
159
160 let (authority, path_and_more) = remainder.split_once('/').unwrap_or((remainder, ""));
161 let path = path_and_more
162 .split_once('?')
163 .map(|(path, _)| path)
164 .unwrap_or(path_and_more)
165 .split_once('#')
166 .map(|(path, _)| path)
167 .unwrap_or(path_and_more);
168
169 let (userinfo, host_port) = authority
170 .rsplit_once('@')
171 .map(|(userinfo, host_port)| (Some(userinfo), host_port))
172 .unwrap_or((None, authority));
173
174 let (username, password) = match userinfo {
175 Some(userinfo) => {
176 let (username, password) = userinfo.split_once(':').unwrap_or((userinfo, ""));
177 (
178 (!username.is_empty()).then(|| username.to_string()),
179 (!password.is_empty()).then(|| password.to_string()),
180 )
181 }
182 None => (None, None),
183 };
184
185 let (host, port) = parse_host_port(host_port, 5552)?;
186 let virtual_host = if path.is_empty() {
187 "/".to_string()
188 } else {
189 format!("/{path}")
190 };
191
192 Ok(Self {
193 host,
194 port,
195 username,
196 password,
197 virtual_host,
198 })
199 }
200}
201
202fn parse_host_port(host_port: &str, default_port: u16) -> Result<(String, u16), AdapterError> {
203 if host_port.is_empty() {
204 return Err(AdapterError::Config {
205 detail: "rabbitmq streams server_url missing host".into(),
206 });
207 }
208
209 if let Some(rest) = host_port.strip_prefix('[') {
210 let (host, remainder) = rest.split_once(']').ok_or_else(|| AdapterError::Config {
211 detail: format!("invalid IPv6 rabbitmq streams host: {host_port}"),
212 })?;
213 let port = if remainder.is_empty() {
214 default_port
215 } else {
216 let port_text = remainder
217 .strip_prefix(':')
218 .ok_or_else(|| AdapterError::Config {
219 detail: format!("invalid rabbitmq streams host/port: {host_port}"),
220 })?;
221 port_text
222 .parse::<u16>()
223 .map_err(|err| AdapterError::Config {
224 detail: format!("invalid rabbitmq streams port {port_text}: {err}"),
225 })?
226 };
227 return Ok((host.to_string(), port));
228 }
229
230 match host_port.rsplit_once(':') {
231 Some((host, port_text)) if !host.is_empty() && !port_text.is_empty() => {
232 let port = port_text
233 .parse::<u16>()
234 .map_err(|err| AdapterError::Config {
235 detail: format!("invalid rabbitmq streams port {port_text}: {err}"),
236 })?;
237 Ok((host.to_string(), port))
238 }
239 _ => Ok((host_port.to_string(), default_port)),
240 }
241}
242
243fn build_environment(
244 runtime: &Runtime,
245 config: &RabbitMqStreamsConnectionConfig,
246 client_name: Option<&str>,
247) -> Result<Environment, AdapterError> {
248 runtime.block_on(async {
249 let mut builder = Environment::builder()
250 .host(&config.host)
251 .port(config.port)
252 .virtual_host(&config.virtual_host);
253 if let Some(username) = &config.username {
254 builder = builder.username(username);
255 }
256 if let Some(password) = &config.password {
257 builder = builder.password(password);
258 }
259 if let Some(client_name) = client_name {
260 builder = builder.client_provided_name(client_name);
261 }
262 builder.build().await.map_err(|err| AdapterError::RabbitMq {
263 op: "rabbitmq_streams_environment_build",
264 detail: err.to_string(),
265 })
266 })
267}
268
269fn extract_message_payload(message: &Message) -> Result<Vec<u8>, AdapterError> {
270 message
271 .data()
272 .map(|payload| payload.to_vec())
273 .ok_or_else(|| AdapterError::RabbitMq {
274 op: "rabbitmq_streams_extract_payload",
275 detail: "delivery message missing binary data body".into(),
276 })
277}
278
279struct AsyncRabbitMqStreamsIngressClient {
280 runtime: Runtime,
281 _environment: Environment,
282 consumer: rabbitmq_stream_client::Consumer,
283}
284
285impl AsyncRabbitMqStreamsIngressClient {
286 fn connect(
287 server_url: &str,
288 stream: &str,
289 consumer_name: Option<&str>,
290 offset: OffsetSpecification,
291 ) -> Result<Self, AdapterError> {
292 let config = RabbitMqStreamsConnectionConfig::parse(server_url)?;
293 let runtime = tokio::runtime::Builder::new_current_thread()
294 .enable_all()
295 .build()
296 .map_err(|err| AdapterError::RabbitMq {
297 op: "rabbitmq_streams_runtime_build",
298 detail: err.to_string(),
299 })?;
300 let environment = build_environment(&runtime, &config, consumer_name)?;
301 let consumer = runtime.block_on(async {
302 let mut builder = environment.consumer().offset(offset);
303 if let Some(consumer_name) = consumer_name {
304 builder = builder
305 .name(consumer_name)
306 .client_provided_name(consumer_name);
307 }
308 builder
309 .build(stream)
310 .await
311 .map_err(|err| AdapterError::RabbitMq {
312 op: "rabbitmq_streams_consumer_build",
313 detail: err.to_string(),
314 })
315 })?;
316 Ok(Self {
317 runtime,
318 _environment: environment,
319 consumer,
320 })
321 }
322}
323
324impl RabbitMqStreamsIngressClient for AsyncRabbitMqStreamsIngressClient {
325 fn read_next(
326 &mut self,
327 timeout: Duration,
328 ) -> Result<Option<RabbitMqStreamsIngressMessage>, AdapterError> {
329 let consumer = &mut self.consumer;
330 self.runtime.block_on(async move {
331 match tokio::time::timeout(timeout, consumer.next()).await {
332 Ok(Some(Ok(delivery))) => Ok(Some(RabbitMqStreamsIngressMessage {
333 payload: extract_message_payload(delivery.message())?,
334 offset: delivery.offset(),
335 })),
336 Ok(Some(Err(err))) => Err(AdapterError::RabbitMq {
337 op: "rabbitmq_streams_consumer_next",
338 detail: err.to_string(),
339 }),
340 Ok(None) => Ok(None),
341 Err(_) => Ok(None),
342 }
343 })
344 }
345}
346
347struct AsyncRabbitMqStreamsEgressClient {
348 runtime: Runtime,
349 _environment: Environment,
350 producer: Producer<NoDedup>,
351}
352
353impl AsyncRabbitMqStreamsEgressClient {
354 fn connect(
355 server_url: &str,
356 stream: &str,
357 publisher_name: Option<&str>,
358 ) -> Result<Self, AdapterError> {
359 let config = RabbitMqStreamsConnectionConfig::parse(server_url)?;
360 let runtime = tokio::runtime::Builder::new_current_thread()
361 .enable_all()
362 .build()
363 .map_err(|err| AdapterError::RabbitMq {
364 op: "rabbitmq_streams_runtime_build",
365 detail: err.to_string(),
366 })?;
367 let environment = build_environment(&runtime, &config, publisher_name)?;
368 let producer = runtime.block_on(async {
369 let mut builder = environment.producer();
370 if let Some(publisher_name) = publisher_name {
371 builder = builder.client_provided_name(publisher_name);
372 }
373 builder
374 .build(stream)
375 .await
376 .map_err(|err| AdapterError::RabbitMq {
377 op: "rabbitmq_streams_producer_build",
378 detail: err.to_string(),
379 })
380 })?;
381 Ok(Self {
382 runtime,
383 _environment: environment,
384 producer,
385 })
386 }
387}
388
389impl RabbitMqStreamsEgressClient for AsyncRabbitMqStreamsEgressClient {
390 fn publish(&mut self, payload: &[u8], timeout: Duration) -> Result<u64, AdapterError> {
391 let producer = &mut self.producer;
392 let message = Message::builder().body(payload.to_vec()).build();
393 self.runtime.block_on(async move {
394 match tokio::time::timeout(timeout, producer.send_with_confirm(message)).await {
395 Ok(Ok(confirmation)) => Ok(confirmation.publishing_id()),
396 Ok(Err(err)) => Err(AdapterError::RabbitMq {
397 op: "rabbitmq_streams_publish",
398 detail: err.to_string(),
399 }),
400 Err(_) => Err(AdapterError::RabbitMq {
401 op: "rabbitmq_streams_publish",
402 detail: format!("publish timed out after {} ms", timeout.as_millis()),
403 }),
404 }
405 })
406 }
407}
408
409pub struct RabbitMqStreamsIngress {
414 name: String,
415 client: Box<dyn RabbitMqStreamsIngressClient>,
416 read_timeout: Duration,
417 max_message_bytes: usize,
418 status: Arc<Mutex<RabbitMqStreamsIngressStatus>>,
419}
420
421impl RabbitMqStreamsIngress {
422 pub fn new(
424 name: impl Into<String>,
425 server_url: &str,
426 stream: &str,
427 consumer_name: Option<&str>,
428 offset: OffsetSpecification,
429 read_timeout: Duration,
430 max_message_bytes: usize,
431 ) -> Result<Self, AdapterError> {
432 let offset_specification = format_offset_specification(&offset);
433 let client =
434 AsyncRabbitMqStreamsIngressClient::connect(server_url, stream, consumer_name, offset)?;
435 Ok(Self {
436 name: name.into(),
437 client: Box::new(client),
438 read_timeout,
439 max_message_bytes,
440 status: Arc::new(Mutex::new(RabbitMqStreamsIngressStatus {
441 stream: stream.to_string(),
442 consumer_name: consumer_name.map(ToOwned::to_owned),
443 offset_specification,
444 last_delivery_offset: None,
445 })),
446 })
447 }
448
449 #[cfg(test)]
450 pub(crate) fn with_client(
452 name: impl Into<String>,
453 client: impl RabbitMqStreamsIngressClient + 'static,
454 read_timeout: Duration,
455 max_message_bytes: usize,
456 ) -> Self {
457 Self {
458 name: name.into(),
459 client: Box::new(client),
460 read_timeout,
461 max_message_bytes,
462 status: Arc::new(Mutex::new(RabbitMqStreamsIngressStatus {
463 stream: "test-stream".to_string(),
464 consumer_name: None,
465 offset_specification: "test".to_string(),
466 last_delivery_offset: None,
467 })),
468 }
469 }
470}
471
472impl Adapter for RabbitMqStreamsIngress {
473 fn name(&self) -> &str {
474 &self.name
475 }
476
477 fn transport_kind(&self) -> &str {
478 "rabbitmq_streams"
479 }
480
481 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
482 self.status.lock().unwrap().status_fields()
483 }
484}
485
486impl IngressAdapter for RabbitMqStreamsIngress {
487 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
488 let Some(message) = self.client.read_next(self.read_timeout)? else {
489 return Ok(None);
490 };
491 let payload = message.payload;
492 if payload.len() > self.max_message_bytes || payload.len() > out.len() {
493 return Err(AdapterError::RabbitMq {
494 op: "rabbitmq_streams_read_next",
495 detail: format!(
496 "payload too large len={} max={} buf={}",
497 payload.len(),
498 self.max_message_bytes,
499 out.len()
500 ),
501 });
502 }
503 self.status.lock().unwrap().last_delivery_offset = Some(message.offset);
504 out[..payload.len()].copy_from_slice(&payload);
505 Ok(Some(payload.len()))
506 }
507}
508
509pub struct RabbitMqStreamsEgress {
513 name: String,
514 client: Box<dyn RabbitMqStreamsEgressClient>,
515 flush_timeout: Duration,
516 pending: Vec<Vec<u8>>,
517 status: Arc<Mutex<RabbitMqStreamsEgressStatus>>,
518}
519
520impl RabbitMqStreamsEgress {
521 pub fn new(
523 name: impl Into<String>,
524 server_url: &str,
525 stream: &str,
526 publisher_name: Option<&str>,
527 flush_timeout: Duration,
528 ) -> Result<Self, AdapterError> {
529 let client = AsyncRabbitMqStreamsEgressClient::connect(server_url, stream, publisher_name)?;
530 Ok(Self {
531 name: name.into(),
532 client: Box::new(client),
533 flush_timeout,
534 pending: Vec::new(),
535 status: Arc::new(Mutex::new(RabbitMqStreamsEgressStatus {
536 stream: stream.to_string(),
537 publisher_name: publisher_name.map(ToOwned::to_owned),
538 last_confirmed_publishing_id: None,
539 pending_messages: 0,
540 last_flush_count: 0,
541 })),
542 })
543 }
544
545 #[cfg(test)]
546 pub(crate) fn with_client(
548 name: impl Into<String>,
549 client: impl RabbitMqStreamsEgressClient + 'static,
550 flush_timeout: Duration,
551 ) -> Self {
552 Self {
553 name: name.into(),
554 client: Box::new(client),
555 flush_timeout,
556 pending: Vec::new(),
557 status: Arc::new(Mutex::new(RabbitMqStreamsEgressStatus {
558 stream: "test-stream".to_string(),
559 publisher_name: None,
560 last_confirmed_publishing_id: None,
561 pending_messages: 0,
562 last_flush_count: 0,
563 })),
564 }
565 }
566}
567
568impl Adapter for RabbitMqStreamsEgress {
569 fn name(&self) -> &str {
570 &self.name
571 }
572
573 fn transport_kind(&self) -> &str {
574 "rabbitmq_streams"
575 }
576
577 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
578 self.status.lock().unwrap().status_fields()
579 }
580}
581
582impl EgressAdapter for RabbitMqStreamsEgress {
583 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
584 self.pending.push(msg.to_vec());
585 self.status.lock().unwrap().pending_messages = self.pending.len() as u64;
586 Ok(())
587 }
588
589 fn flush(&mut self) -> Result<(), AdapterError> {
590 let pending = mem::take(&mut self.pending);
591 let flushed_count = pending.len() as u64;
592 for payload in pending {
593 let publishing_id = self.client.publish(&payload, self.flush_timeout)?;
594 self.status.lock().unwrap().last_confirmed_publishing_id = Some(publishing_id);
595 }
596 let mut status = self.status.lock().unwrap();
597 status.pending_messages = 0;
598 status.last_flush_count = flushed_count;
599 Ok(())
600 }
601}