byteor_adapters/
nats.rs

1use std::time::Duration;
2
3use futures_util::StreamExt;
4use tokio::runtime::Runtime;
5
6use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
7
8/// Internal NATS ingress client contract used by the adapter and crate-local tests.
9pub(crate) trait NatsIngressClient: Send {
10    fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError>;
11}
12
13/// Internal NATS egress client contract used by the adapter and crate-local tests.
14pub(crate) trait NatsEgressClient: Send {
15    fn publish(&mut self, payload: &[u8]) -> Result<(), AdapterError>;
16    fn flush(&mut self) -> Result<(), AdapterError>;
17}
18
19struct AsyncNatsIngressClient {
20    runtime: Runtime,
21    subscriber: async_nats::Subscriber,
22}
23
24impl AsyncNatsIngressClient {
25    fn connect(
26        server_url: &str,
27        subject: &str,
28        queue_group: Option<&str>,
29    ) -> Result<Self, AdapterError> {
30        let runtime = tokio::runtime::Builder::new_current_thread()
31            .enable_all()
32            .build()
33            .map_err(|err| AdapterError::Nats {
34                op: "nats_runtime_build",
35                detail: err.to_string(),
36            })?;
37        let (client, subscriber) = runtime.block_on(async {
38            let client =
39                async_nats::connect(server_url)
40                    .await
41                    .map_err(|err| AdapterError::Nats {
42                        op: "nats_connect",
43                        detail: err.to_string(),
44                    })?;
45            let subscriber = match queue_group {
46                Some(group) => client
47                    .queue_subscribe(subject.to_string(), group.to_string())
48                    .await
49                    .map_err(|err| AdapterError::Nats {
50                        op: "nats_queue_subscribe",
51                        detail: err.to_string(),
52                    })?,
53                None => client.subscribe(subject.to_string()).await.map_err(|err| {
54                    AdapterError::Nats {
55                        op: "nats_subscribe",
56                        detail: err.to_string(),
57                    }
58                })?,
59            };
60            Ok::<_, AdapterError>((client, subscriber))
61        })?;
62        drop(client);
63        Ok(Self {
64            runtime,
65            subscriber,
66        })
67    }
68}
69
70impl NatsIngressClient for AsyncNatsIngressClient {
71    fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError> {
72        let subscriber = &mut self.subscriber;
73        self.runtime.block_on(async move {
74            match tokio::time::timeout(timeout, subscriber.next()).await {
75                Ok(Some(message)) => Ok(Some(message.payload.to_vec())),
76                Ok(None) => Ok(None),
77                Err(_) => Ok(None),
78            }
79        })
80    }
81}
82
83struct AsyncNatsEgressClient {
84    runtime: Runtime,
85    client: async_nats::Client,
86    subject: String,
87}
88
89impl AsyncNatsEgressClient {
90    fn connect(server_url: &str, subject: impl Into<String>) -> Result<Self, AdapterError> {
91        let runtime = tokio::runtime::Builder::new_current_thread()
92            .enable_all()
93            .build()
94            .map_err(|err| AdapterError::Nats {
95                op: "nats_runtime_build",
96                detail: err.to_string(),
97            })?;
98        let client = runtime.block_on(async {
99            async_nats::connect(server_url)
100                .await
101                .map_err(|err| AdapterError::Nats {
102                    op: "nats_connect",
103                    detail: err.to_string(),
104                })
105        })?;
106        Ok(Self {
107            runtime,
108            client,
109            subject: subject.into(),
110        })
111    }
112}
113
114impl NatsEgressClient for AsyncNatsEgressClient {
115    fn publish(&mut self, payload: &[u8]) -> Result<(), AdapterError> {
116        let client = self.client.clone();
117        let subject = self.subject.clone();
118        let payload = payload.to_vec();
119        self.runtime.block_on(async move {
120            client
121                .publish(subject, payload.into())
122                .await
123                .map_err(|err| AdapterError::Nats {
124                    op: "nats_publish",
125                    detail: err.to_string(),
126                })
127        })
128    }
129
130    fn flush(&mut self) -> Result<(), AdapterError> {
131        let client = self.client.clone();
132        self.runtime.block_on(async move {
133            client.flush().await.map_err(|err| AdapterError::Nats {
134                op: "nats_flush",
135                detail: err.to_string(),
136            })
137        })
138    }
139}
140
141/// Minimal blocking NATS ingress adapter.
142///
143/// This adapter subscribes to one NATS subject and exposes each message payload as one ingress
144/// message.
145pub struct NatsIngress {
146    name: String,
147    client: Box<dyn NatsIngressClient>,
148    read_timeout: Duration,
149    max_message_bytes: usize,
150}
151
152impl NatsIngress {
153    /// Create a blocking NATS ingress adapter.
154    pub fn new(
155        name: impl Into<String>,
156        server_url: &str,
157        subject: &str,
158        queue_group: Option<&str>,
159        read_timeout: Duration,
160        max_message_bytes: usize,
161    ) -> Result<Self, AdapterError> {
162        let client = AsyncNatsIngressClient::connect(server_url, subject, queue_group)?;
163        Ok(Self {
164            name: name.into(),
165            client: Box::new(client),
166            read_timeout,
167            max_message_bytes,
168        })
169    }
170
171    #[cfg(test)]
172    /// Build a NATS ingress adapter around a crate-local fake client.
173    pub(crate) fn with_client(
174        name: impl Into<String>,
175        client: impl NatsIngressClient + 'static,
176        read_timeout: Duration,
177        max_message_bytes: usize,
178    ) -> Self {
179        Self {
180            name: name.into(),
181            client: Box::new(client),
182            read_timeout,
183            max_message_bytes,
184        }
185    }
186}
187
188impl Adapter for NatsIngress {
189    fn name(&self) -> &str {
190        &self.name
191    }
192
193    fn transport_kind(&self) -> &str {
194        "nats"
195    }
196}
197
198impl IngressAdapter for NatsIngress {
199    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
200        let Some(payload) = self.client.read_next(self.read_timeout)? else {
201            return Ok(None);
202        };
203        if payload.len() > self.max_message_bytes || payload.len() > out.len() {
204            return Err(AdapterError::Nats {
205                op: "nats_read_next",
206                detail: format!(
207                    "payload too large len={} max={} buf={}",
208                    payload.len(),
209                    self.max_message_bytes,
210                    out.len()
211                ),
212            });
213        }
214        out[..payload.len()].copy_from_slice(&payload);
215        Ok(Some(payload.len()))
216    }
217}
218
219/// Minimal blocking NATS egress adapter.
220///
221/// Each `write_msg()` publishes one message to the configured subject.
222pub struct NatsEgress {
223    name: String,
224    client: Box<dyn NatsEgressClient>,
225}
226
227impl NatsEgress {
228    /// Create a blocking NATS egress adapter.
229    pub fn new(
230        name: impl Into<String>,
231        server_url: &str,
232        subject: &str,
233    ) -> Result<Self, AdapterError> {
234        let client = AsyncNatsEgressClient::connect(server_url, subject)?;
235        Ok(Self {
236            name: name.into(),
237            client: Box::new(client),
238        })
239    }
240
241    #[cfg(test)]
242    /// Build a NATS egress adapter around a crate-local fake client.
243    pub(crate) fn with_client(
244        name: impl Into<String>,
245        client: impl NatsEgressClient + 'static,
246    ) -> Self {
247        Self {
248            name: name.into(),
249            client: Box::new(client),
250        }
251    }
252}
253
254impl Adapter for NatsEgress {
255    fn name(&self) -> &str {
256        &self.name
257    }
258
259    fn transport_kind(&self) -> &str {
260        "nats"
261    }
262}
263
264impl EgressAdapter for NatsEgress {
265    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
266        self.client.publish(msg)
267    }
268
269    fn flush(&mut self) -> Result<(), AdapterError> {
270        self.client.flush()
271    }
272}