byteor_adapters/
rabbitmq_streams.rs

1use std::mem;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use futures_util::StreamExt;
6use rabbitmq_stream_client::types::{Message, OffsetSpecification};
7use rabbitmq_stream_client::{Environment, NoDedup, Producer};
8use tokio::runtime::Runtime;
9
10use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
11
12pub(crate) struct RabbitMqStreamsIngressMessage {
13    pub payload: Vec<u8>,
14    pub offset: u64,
15}
16
17#[derive(Debug)]
18struct RabbitMqStreamsIngressStatus {
19    stream: String,
20    consumer_name: Option<String>,
21    offset_specification: String,
22    last_delivery_offset: Option<u64>,
23}
24
25impl RabbitMqStreamsIngressStatus {
26    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
27        let mut details = serde_json::Map::new();
28        details.insert(
29            "stream".to_string(),
30            serde_json::Value::String(self.stream.clone()),
31        );
32        details.insert(
33            "offset_specification".to_string(),
34            serde_json::Value::String(self.offset_specification.clone()),
35        );
36        if let Some(consumer_name) = &self.consumer_name {
37            details.insert(
38                "consumer_name".to_string(),
39                serde_json::Value::String(consumer_name.clone()),
40            );
41        }
42        if let Some(offset) = self.last_delivery_offset {
43            details.insert(
44                "consumer_offset".to_string(),
45                serde_json::Value::from(offset),
46            );
47        }
48        details
49    }
50}
51
52#[derive(Debug)]
53struct RabbitMqStreamsEgressStatus {
54    stream: String,
55    publisher_name: Option<String>,
56    last_confirmed_publishing_id: Option<u64>,
57    pending_messages: u64,
58    last_flush_count: u64,
59}
60
61impl RabbitMqStreamsEgressStatus {
62    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
63        let mut details = serde_json::Map::new();
64        details.insert(
65            "stream".to_string(),
66            serde_json::Value::String(self.stream.clone()),
67        );
68        details.insert(
69            "publish_state".to_string(),
70            serde_json::Value::String(
71                if self.pending_messages > 0 {
72                    "buffering"
73                } else if self.last_confirmed_publishing_id.is_some() {
74                    "confirmed"
75                } else {
76                    "idle"
77                }
78                .to_string(),
79            ),
80        );
81        details.insert(
82            "pending_messages".to_string(),
83            serde_json::Value::from(self.pending_messages),
84        );
85        details.insert(
86            "last_flush_count".to_string(),
87            serde_json::Value::from(self.last_flush_count),
88        );
89        if let Some(publisher_name) = &self.publisher_name {
90            details.insert(
91                "publisher_name".to_string(),
92                serde_json::Value::String(publisher_name.clone()),
93            );
94        }
95        if let Some(publishing_id) = self.last_confirmed_publishing_id {
96            details.insert(
97                "last_confirmed_publishing_id".to_string(),
98                serde_json::Value::from(publishing_id),
99            );
100        }
101        details
102    }
103}
104
105fn format_offset_specification(offset: &OffsetSpecification) -> String {
106    match offset {
107        OffsetSpecification::First => "first".to_string(),
108        OffsetSpecification::Last => "last".to_string(),
109        OffsetSpecification::Next => "next".to_string(),
110        OffsetSpecification::Offset(value) => format!("offset:{value}"),
111        OffsetSpecification::Timestamp(value) => format!("timestamp:{value}"),
112    }
113}
114
115/// Internal RabbitMQ Streams ingress client contract used by the adapter and crate-local tests.
116pub(crate) trait RabbitMqStreamsIngressClient: Send {
117    fn read_next(
118        &mut self,
119        timeout: Duration,
120    ) -> Result<Option<RabbitMqStreamsIngressMessage>, AdapterError>;
121}
122
123/// Internal RabbitMQ Streams egress client contract used by the adapter and crate-local tests.
124pub(crate) trait RabbitMqStreamsEgressClient: Send {
125    fn publish(&mut self, payload: &[u8], timeout: Duration) -> Result<u64, AdapterError>;
126}
127
128struct RabbitMqStreamsConnectionConfig {
129    host: String,
130    port: u16,
131    username: Option<String>,
132    password: Option<String>,
133    virtual_host: String,
134}
135
136impl RabbitMqStreamsConnectionConfig {
137    fn parse(server_url: &str) -> Result<Self, AdapterError> {
138        let (scheme, remainder) =
139            server_url
140                .split_once("://")
141                .ok_or_else(|| AdapterError::Config {
142                    detail: format!(
143                        "rabbitmq streams server_url must include a scheme: {server_url}"
144                    ),
145                })?;
146
147        if matches!(scheme, "amqps" | "rabbitmqs") {
148            return Err(AdapterError::Config {
149                detail: format!(
150                    "rabbitmq streams TLS server URLs are not yet supported: {server_url}"
151                ),
152            });
153        }
154        if !matches!(scheme, "amqp" | "rabbitmq") {
155            return Err(AdapterError::Config {
156                detail: format!("unsupported rabbitmq streams server_url scheme: {scheme}"),
157            });
158        }
159
160        let (authority, path_and_more) = remainder.split_once('/').unwrap_or((remainder, ""));
161        let path = path_and_more
162            .split_once('?')
163            .map(|(path, _)| path)
164            .unwrap_or(path_and_more)
165            .split_once('#')
166            .map(|(path, _)| path)
167            .unwrap_or(path_and_more);
168
169        let (userinfo, host_port) = authority
170            .rsplit_once('@')
171            .map(|(userinfo, host_port)| (Some(userinfo), host_port))
172            .unwrap_or((None, authority));
173
174        let (username, password) = match userinfo {
175            Some(userinfo) => {
176                let (username, password) = userinfo.split_once(':').unwrap_or((userinfo, ""));
177                (
178                    (!username.is_empty()).then(|| username.to_string()),
179                    (!password.is_empty()).then(|| password.to_string()),
180                )
181            }
182            None => (None, None),
183        };
184
185        let (host, port) = parse_host_port(host_port, 5552)?;
186        let virtual_host = if path.is_empty() {
187            "/".to_string()
188        } else {
189            format!("/{path}")
190        };
191
192        Ok(Self {
193            host,
194            port,
195            username,
196            password,
197            virtual_host,
198        })
199    }
200}
201
202fn parse_host_port(host_port: &str, default_port: u16) -> Result<(String, u16), AdapterError> {
203    if host_port.is_empty() {
204        return Err(AdapterError::Config {
205            detail: "rabbitmq streams server_url missing host".into(),
206        });
207    }
208
209    if let Some(rest) = host_port.strip_prefix('[') {
210        let (host, remainder) = rest.split_once(']').ok_or_else(|| AdapterError::Config {
211            detail: format!("invalid IPv6 rabbitmq streams host: {host_port}"),
212        })?;
213        let port = if remainder.is_empty() {
214            default_port
215        } else {
216            let port_text = remainder
217                .strip_prefix(':')
218                .ok_or_else(|| AdapterError::Config {
219                    detail: format!("invalid rabbitmq streams host/port: {host_port}"),
220                })?;
221            port_text
222                .parse::<u16>()
223                .map_err(|err| AdapterError::Config {
224                    detail: format!("invalid rabbitmq streams port {port_text}: {err}"),
225                })?
226        };
227        return Ok((host.to_string(), port));
228    }
229
230    match host_port.rsplit_once(':') {
231        Some((host, port_text)) if !host.is_empty() && !port_text.is_empty() => {
232            let port = port_text
233                .parse::<u16>()
234                .map_err(|err| AdapterError::Config {
235                    detail: format!("invalid rabbitmq streams port {port_text}: {err}"),
236                })?;
237            Ok((host.to_string(), port))
238        }
239        _ => Ok((host_port.to_string(), default_port)),
240    }
241}
242
243fn build_environment(
244    runtime: &Runtime,
245    config: &RabbitMqStreamsConnectionConfig,
246    client_name: Option<&str>,
247) -> Result<Environment, AdapterError> {
248    runtime.block_on(async {
249        let mut builder = Environment::builder()
250            .host(&config.host)
251            .port(config.port)
252            .virtual_host(&config.virtual_host);
253        if let Some(username) = &config.username {
254            builder = builder.username(username);
255        }
256        if let Some(password) = &config.password {
257            builder = builder.password(password);
258        }
259        if let Some(client_name) = client_name {
260            builder = builder.client_provided_name(client_name);
261        }
262        builder.build().await.map_err(|err| AdapterError::RabbitMq {
263            op: "rabbitmq_streams_environment_build",
264            detail: err.to_string(),
265        })
266    })
267}
268
269fn extract_message_payload(message: &Message) -> Result<Vec<u8>, AdapterError> {
270    message
271        .data()
272        .map(|payload| payload.to_vec())
273        .ok_or_else(|| AdapterError::RabbitMq {
274            op: "rabbitmq_streams_extract_payload",
275            detail: "delivery message missing binary data body".into(),
276        })
277}
278
279struct AsyncRabbitMqStreamsIngressClient {
280    runtime: Runtime,
281    _environment: Environment,
282    consumer: rabbitmq_stream_client::Consumer,
283}
284
285impl AsyncRabbitMqStreamsIngressClient {
286    fn connect(
287        server_url: &str,
288        stream: &str,
289        consumer_name: Option<&str>,
290        offset: OffsetSpecification,
291    ) -> Result<Self, AdapterError> {
292        let config = RabbitMqStreamsConnectionConfig::parse(server_url)?;
293        let runtime = tokio::runtime::Builder::new_current_thread()
294            .enable_all()
295            .build()
296            .map_err(|err| AdapterError::RabbitMq {
297                op: "rabbitmq_streams_runtime_build",
298                detail: err.to_string(),
299            })?;
300        let environment = build_environment(&runtime, &config, consumer_name)?;
301        let consumer = runtime.block_on(async {
302            let mut builder = environment.consumer().offset(offset);
303            if let Some(consumer_name) = consumer_name {
304                builder = builder
305                    .name(consumer_name)
306                    .client_provided_name(consumer_name);
307            }
308            builder
309                .build(stream)
310                .await
311                .map_err(|err| AdapterError::RabbitMq {
312                    op: "rabbitmq_streams_consumer_build",
313                    detail: err.to_string(),
314                })
315        })?;
316        Ok(Self {
317            runtime,
318            _environment: environment,
319            consumer,
320        })
321    }
322}
323
324impl RabbitMqStreamsIngressClient for AsyncRabbitMqStreamsIngressClient {
325    fn read_next(
326        &mut self,
327        timeout: Duration,
328    ) -> Result<Option<RabbitMqStreamsIngressMessage>, AdapterError> {
329        let consumer = &mut self.consumer;
330        self.runtime.block_on(async move {
331            match tokio::time::timeout(timeout, consumer.next()).await {
332                Ok(Some(Ok(delivery))) => Ok(Some(RabbitMqStreamsIngressMessage {
333                    payload: extract_message_payload(delivery.message())?,
334                    offset: delivery.offset(),
335                })),
336                Ok(Some(Err(err))) => Err(AdapterError::RabbitMq {
337                    op: "rabbitmq_streams_consumer_next",
338                    detail: err.to_string(),
339                }),
340                Ok(None) => Ok(None),
341                Err(_) => Ok(None),
342            }
343        })
344    }
345}
346
347struct AsyncRabbitMqStreamsEgressClient {
348    runtime: Runtime,
349    _environment: Environment,
350    producer: Producer<NoDedup>,
351}
352
353impl AsyncRabbitMqStreamsEgressClient {
354    fn connect(
355        server_url: &str,
356        stream: &str,
357        publisher_name: Option<&str>,
358    ) -> Result<Self, AdapterError> {
359        let config = RabbitMqStreamsConnectionConfig::parse(server_url)?;
360        let runtime = tokio::runtime::Builder::new_current_thread()
361            .enable_all()
362            .build()
363            .map_err(|err| AdapterError::RabbitMq {
364                op: "rabbitmq_streams_runtime_build",
365                detail: err.to_string(),
366            })?;
367        let environment = build_environment(&runtime, &config, publisher_name)?;
368        let producer = runtime.block_on(async {
369            let mut builder = environment.producer();
370            if let Some(publisher_name) = publisher_name {
371                builder = builder.client_provided_name(publisher_name);
372            }
373            builder
374                .build(stream)
375                .await
376                .map_err(|err| AdapterError::RabbitMq {
377                    op: "rabbitmq_streams_producer_build",
378                    detail: err.to_string(),
379                })
380        })?;
381        Ok(Self {
382            runtime,
383            _environment: environment,
384            producer,
385        })
386    }
387}
388
389impl RabbitMqStreamsEgressClient for AsyncRabbitMqStreamsEgressClient {
390    fn publish(&mut self, payload: &[u8], timeout: Duration) -> Result<u64, AdapterError> {
391        let producer = &mut self.producer;
392        let message = Message::builder().body(payload.to_vec()).build();
393        self.runtime.block_on(async move {
394            match tokio::time::timeout(timeout, producer.send_with_confirm(message)).await {
395                Ok(Ok(confirmation)) => Ok(confirmation.publishing_id()),
396                Ok(Err(err)) => Err(AdapterError::RabbitMq {
397                    op: "rabbitmq_streams_publish",
398                    detail: err.to_string(),
399                }),
400                Err(_) => Err(AdapterError::RabbitMq {
401                    op: "rabbitmq_streams_publish",
402                    detail: format!("publish timed out after {} ms", timeout.as_millis()),
403                }),
404            }
405        })
406    }
407}
408
409/// Minimal blocking RabbitMQ Streams ingress adapter.
410///
411/// This adapter consumes from one RabbitMQ stream and exposes each message body as one ingress
412/// message.
413pub struct RabbitMqStreamsIngress {
414    name: String,
415    client: Box<dyn RabbitMqStreamsIngressClient>,
416    read_timeout: Duration,
417    max_message_bytes: usize,
418    status: Arc<Mutex<RabbitMqStreamsIngressStatus>>,
419}
420
421impl RabbitMqStreamsIngress {
422    /// Create a blocking RabbitMQ Streams ingress adapter.
423    pub fn new(
424        name: impl Into<String>,
425        server_url: &str,
426        stream: &str,
427        consumer_name: Option<&str>,
428        offset: OffsetSpecification,
429        read_timeout: Duration,
430        max_message_bytes: usize,
431    ) -> Result<Self, AdapterError> {
432        let offset_specification = format_offset_specification(&offset);
433        let client =
434            AsyncRabbitMqStreamsIngressClient::connect(server_url, stream, consumer_name, offset)?;
435        Ok(Self {
436            name: name.into(),
437            client: Box::new(client),
438            read_timeout,
439            max_message_bytes,
440            status: Arc::new(Mutex::new(RabbitMqStreamsIngressStatus {
441                stream: stream.to_string(),
442                consumer_name: consumer_name.map(ToOwned::to_owned),
443                offset_specification,
444                last_delivery_offset: None,
445            })),
446        })
447    }
448
449    #[cfg(test)]
450    /// Build a RabbitMQ Streams ingress adapter around a crate-local fake client.
451    pub(crate) fn with_client(
452        name: impl Into<String>,
453        client: impl RabbitMqStreamsIngressClient + 'static,
454        read_timeout: Duration,
455        max_message_bytes: usize,
456    ) -> Self {
457        Self {
458            name: name.into(),
459            client: Box::new(client),
460            read_timeout,
461            max_message_bytes,
462            status: Arc::new(Mutex::new(RabbitMqStreamsIngressStatus {
463                stream: "test-stream".to_string(),
464                consumer_name: None,
465                offset_specification: "test".to_string(),
466                last_delivery_offset: None,
467            })),
468        }
469    }
470}
471
472impl Adapter for RabbitMqStreamsIngress {
473    fn name(&self) -> &str {
474        &self.name
475    }
476
477    fn transport_kind(&self) -> &str {
478        "rabbitmq_streams"
479    }
480
481    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
482        self.status.lock().unwrap().status_fields()
483    }
484}
485
486impl IngressAdapter for RabbitMqStreamsIngress {
487    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
488        let Some(message) = self.client.read_next(self.read_timeout)? else {
489            return Ok(None);
490        };
491        let payload = message.payload;
492        if payload.len() > self.max_message_bytes || payload.len() > out.len() {
493            return Err(AdapterError::RabbitMq {
494                op: "rabbitmq_streams_read_next",
495                detail: format!(
496                    "payload too large len={} max={} buf={}",
497                    payload.len(),
498                    self.max_message_bytes,
499                    out.len()
500                ),
501            });
502        }
503        self.status.lock().unwrap().last_delivery_offset = Some(message.offset);
504        out[..payload.len()].copy_from_slice(&payload);
505        Ok(Some(payload.len()))
506    }
507}
508
509/// Minimal blocking RabbitMQ Streams egress adapter.
510///
511/// Each `flush()` publishes all buffered messages to the configured RabbitMQ stream.
512pub struct RabbitMqStreamsEgress {
513    name: String,
514    client: Box<dyn RabbitMqStreamsEgressClient>,
515    flush_timeout: Duration,
516    pending: Vec<Vec<u8>>,
517    status: Arc<Mutex<RabbitMqStreamsEgressStatus>>,
518}
519
520impl RabbitMqStreamsEgress {
521    /// Create a blocking RabbitMQ Streams egress adapter.
522    pub fn new(
523        name: impl Into<String>,
524        server_url: &str,
525        stream: &str,
526        publisher_name: Option<&str>,
527        flush_timeout: Duration,
528    ) -> Result<Self, AdapterError> {
529        let client = AsyncRabbitMqStreamsEgressClient::connect(server_url, stream, publisher_name)?;
530        Ok(Self {
531            name: name.into(),
532            client: Box::new(client),
533            flush_timeout,
534            pending: Vec::new(),
535            status: Arc::new(Mutex::new(RabbitMqStreamsEgressStatus {
536                stream: stream.to_string(),
537                publisher_name: publisher_name.map(ToOwned::to_owned),
538                last_confirmed_publishing_id: None,
539                pending_messages: 0,
540                last_flush_count: 0,
541            })),
542        })
543    }
544
545    #[cfg(test)]
546    /// Build a RabbitMQ Streams egress adapter around a crate-local fake client.
547    pub(crate) fn with_client(
548        name: impl Into<String>,
549        client: impl RabbitMqStreamsEgressClient + 'static,
550        flush_timeout: Duration,
551    ) -> Self {
552        Self {
553            name: name.into(),
554            client: Box::new(client),
555            flush_timeout,
556            pending: Vec::new(),
557            status: Arc::new(Mutex::new(RabbitMqStreamsEgressStatus {
558                stream: "test-stream".to_string(),
559                publisher_name: None,
560                last_confirmed_publishing_id: None,
561                pending_messages: 0,
562                last_flush_count: 0,
563            })),
564        }
565    }
566}
567
568impl Adapter for RabbitMqStreamsEgress {
569    fn name(&self) -> &str {
570        &self.name
571    }
572
573    fn transport_kind(&self) -> &str {
574        "rabbitmq_streams"
575    }
576
577    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
578        self.status.lock().unwrap().status_fields()
579    }
580}
581
582impl EgressAdapter for RabbitMqStreamsEgress {
583    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
584        self.pending.push(msg.to_vec());
585        self.status.lock().unwrap().pending_messages = self.pending.len() as u64;
586        Ok(())
587    }
588
589    fn flush(&mut self) -> Result<(), AdapterError> {
590        let pending = mem::take(&mut self.pending);
591        let flushed_count = pending.len() as u64;
592        for payload in pending {
593            let publishing_id = self.client.publish(&payload, self.flush_timeout)?;
594            self.status.lock().unwrap().last_confirmed_publishing_id = Some(publishing_id);
595        }
596        let mut status = self.status.lock().unwrap();
597        status.pending_messages = 0;
598        status.last_flush_count = flushed_count;
599        Ok(())
600    }
601}